http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cafee25f/modules/core/src/main/java/org/apache/ignite/cache/eviction/ggfs/GridCacheGgfsPerBlockLruEvictionPolicy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/eviction/ggfs/GridCacheGgfsPerBlockLruEvictionPolicy.java b/modules/core/src/main/java/org/apache/ignite/cache/eviction/ggfs/GridCacheGgfsPerBlockLruEvictionPolicy.java deleted file mode 100644 index a2ca603..0000000 --- a/modules/core/src/main/java/org/apache/ignite/cache/eviction/ggfs/GridCacheGgfsPerBlockLruEvictionPolicy.java +++ /dev/null @@ -1,353 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.cache.eviction.ggfs; - -import org.apache.ignite.*; -import org.apache.ignite.cache.*; -import org.apache.ignite.cache.eviction.*; -import org.apache.ignite.fs.*; -import org.apache.ignite.internal.processors.fs.*; -import org.jdk8.backport.*; -import org.jdk8.backport.ConcurrentLinkedDeque8.*; -import org.jetbrains.annotations.*; - -import java.util.*; -import java.util.concurrent.atomic.*; -import java.util.regex.*; - -/** - * GGFS eviction policy which evicts particular blocks. - */ -public class GridCacheGgfsPerBlockLruEvictionPolicy implements GridCacheEvictionPolicy<GridGgfsBlockKey, byte[]>, - GridCacheGgfsPerBlockLruEvictionPolicyMBean { - /** Meta denoting node in the queue. */ - public static final String META_NODE = "ggfs_node"; - - /** Maximum size. When reached, eviction begins. */ - private volatile long maxSize; - - /** Maximum amount of blocks. When reached, eviction begins. */ - private volatile int maxBlocks; - - /** Collection of regex for paths which must not be evicted. */ - private volatile Collection<String> excludePaths; - - /** Exclusion patterns. */ - private volatile Collection<Pattern> excludePatterns; - - /** Whether patterns must be recompiled during the next call. */ - private final AtomicBoolean excludeRecompile = new AtomicBoolean(true); - - /** Queue. */ - private final ConcurrentLinkedDeque8<GridCacheEntry<GridGgfsBlockKey, byte[]>> queue = - new ConcurrentLinkedDeque8<>(); - - /** Current size of all enqueued blocks in bytes. */ - private final LongAdder curSize = new LongAdder(); - - /** - * Default constructor. - */ - public GridCacheGgfsPerBlockLruEvictionPolicy() { - // No-op. - } - - /** - * Constructor. - * - * @param maxSize Maximum size. When reached, eviction begins. - * @param maxBlocks Maximum amount of blocks. When reached, eviction begins. - */ - public GridCacheGgfsPerBlockLruEvictionPolicy(long maxSize, int maxBlocks) { - this(maxSize, maxBlocks, null); - } - - /** - * Constructor. - * - * @param maxSize Maximum size. When reached, eviction begins. - * @param maxBlocks Maximum amount of blocks. When reached, eviction begins. - * @param excludePaths Collection of regex for path which must not be evicted. - */ - public GridCacheGgfsPerBlockLruEvictionPolicy(long maxSize, int maxBlocks, - @Nullable Collection<String> excludePaths) { - this.maxSize = maxSize; - this.maxBlocks = maxBlocks; - this.excludePaths = excludePaths; - } - - /** {@inheritDoc} */ - @Override public void onEntryAccessed(boolean rmv, GridCacheEntry<GridGgfsBlockKey, byte[]> entry) { - if (!rmv) { - if (!entry.isCached()) - return; - - if (touch(entry)) - shrink(); - } - else { - MetaEntry meta = entry.removeMeta(META_NODE); - - if (meta != null && queue.unlinkx(meta.node())) - changeSize(-meta.size()); - } - } - - /** - * @param entry Entry to touch. - * @return {@code True} if new node has been added to queue by this call. - */ - private boolean touch(GridCacheEntry<GridGgfsBlockKey, byte[]> entry) { - byte[] val = entry.peek(); - - int blockSize = val != null ? val.length : 0; - - MetaEntry meta = entry.meta(META_NODE); - - // Entry has not been enqueued yet. - if (meta == null) { - while (true) { - Node<GridCacheEntry<GridGgfsBlockKey, byte[]>> node = queue.offerLastx(entry); - - meta = new MetaEntry(node, blockSize); - - if (entry.putMetaIfAbsent(META_NODE, meta) != null) { - // Was concurrently added, need to clear it from queue. - queue.unlinkx(node); - - // Queue has not been changed. - return false; - } - else if (node.item() != null) { - if (!entry.isCached()) { - // Was concurrently evicted, need to clear it from queue. - queue.unlinkx(node); - - return false; - } - - // Increment current size. - changeSize(blockSize); - - return true; - } - // If node was unlinked by concurrent shrink() call, we must repeat the whole cycle. - else if (!entry.removeMeta(META_NODE, node)) - return false; - } - } - else { - int oldBlockSize = meta.size(); - - Node<GridCacheEntry<GridGgfsBlockKey, byte[]>> node = meta.node(); - - if (queue.unlinkx(node)) { - // Move node to tail. - Node<GridCacheEntry<GridGgfsBlockKey, byte[]>> newNode = queue.offerLastx(entry); - - int delta = blockSize - oldBlockSize; - - if (!entry.replaceMeta(META_NODE, meta, new MetaEntry(newNode, blockSize))) { - // Was concurrently added, need to clear it from queue. - if (queue.unlinkx(newNode)) - delta -= blockSize; - } - - if (delta != 0) { - changeSize(delta); - - if (delta > 0) - // Total size increased, so shrinking could be needed. - return true; - } - } - } - - // Entry is already in queue. - return false; - } - - /** - * Shrinks queue to maximum allowed size. - */ - private void shrink() { - long maxSize = this.maxSize; - int maxBlocks = this.maxBlocks; - - int cnt = queue.sizex(); - - for (int i = 0; i < cnt && (maxBlocks > 0 && queue.sizex() > maxBlocks || - maxSize > 0 && curSize.longValue() > maxSize); i++) { - GridCacheEntry<GridGgfsBlockKey, byte[]> entry = queue.poll(); - - if (entry == null) - break; // Queue is empty. - - byte[] val = entry.peek(); - - if (val != null) - changeSize(-val.length); // Change current size as we polled entry from the queue. - - if (!entry.evict()) { - // Reorder entries which we failed to evict. - entry.removeMeta(META_NODE); - - touch(entry); - } - } - } - - /** - * Change current size. - * - * @param delta Delta in bytes. - */ - private void changeSize(int delta) { - if (delta != 0) - curSize.add(delta); - } - - /** {@inheritDoc} */ - @Override public long getMaxSize() { - return maxSize; - } - - /** {@inheritDoc} */ - @Override public void setMaxSize(long maxSize) { - this.maxSize = maxSize; - } - - /** {@inheritDoc} */ - @Override public int getMaxBlocks() { - return maxBlocks; - } - - /** {@inheritDoc} */ - @Override public void setMaxBlocks(int maxBlocks) { - this.maxBlocks = maxBlocks; - } - - /** {@inheritDoc} */ - @Override public Collection<String> getExcludePaths() { - return Collections.unmodifiableCollection(excludePaths); - } - - /** {@inheritDoc} */ - @Override public void setExcludePaths(@Nullable Collection<String> excludePaths) { - this.excludePaths = excludePaths; - - excludeRecompile.set(true); - } - - /** {@inheritDoc} */ - @Override public long getCurrentSize() { - return curSize.longValue(); - } - - /** {@inheritDoc} */ - @Override public int getCurrentBlocks() { - return queue.size(); - } - - /** - * Check whether provided path must be excluded from evictions. - * - * @param path Path. - * @return {@code True} in case non block of related file must be excluded. - * @throws IgniteCheckedException In case of faulty patterns. - */ - public boolean exclude(IgniteFsPath path) throws IgniteCheckedException { - assert path != null; - - Collection<Pattern> excludePatterns0; - - if (excludeRecompile.compareAndSet(true, false)) { - // Recompile. - Collection<String> excludePaths0 = excludePaths; - - if (excludePaths0 != null) { - excludePatterns0 = new HashSet<>(excludePaths0.size(), 1.0f); - - for (String excludePath : excludePaths0) { - try { - excludePatterns0.add(Pattern.compile(excludePath)); - } - catch (PatternSyntaxException ignore) { - throw new IgniteCheckedException("Invalid regex pattern: " + excludePath); - } - } - - excludePatterns = excludePatterns0; - } - else - excludePatterns0 = excludePatterns = null; - } - else - excludePatterns0 = excludePatterns; - - if (excludePatterns0 != null) { - String pathStr = path.toString(); - - for (Pattern pattern : excludePatterns0) { - if (pattern.matcher(pathStr).matches()) - return true; - } - } - - return false; - } - - /** - * Meta entry. - */ - private static class MetaEntry { - /** Queue node. */ - private final Node<GridCacheEntry<GridGgfsBlockKey, byte[]>> node; - - /** Data size. */ - private final int size; - - /** - * Constructor. - * - * @param node Queue node. - * @param size Data size. - */ - private MetaEntry(Node<GridCacheEntry<GridGgfsBlockKey, byte[]>> node, int size) { - assert node != null; - assert size >= 0; - - this.node = node; - this.size = size; - } - - /** - * @return Queue node. - */ - private Node<GridCacheEntry<GridGgfsBlockKey, byte[]>> node() { - return node; - } - - /** - * @return Data size. - */ - private int size() { - return size; - } - } -}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cafee25f/modules/core/src/main/java/org/apache/ignite/cache/eviction/ggfs/GridCacheGgfsPerBlockLruEvictionPolicyMBean.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/eviction/ggfs/GridCacheGgfsPerBlockLruEvictionPolicyMBean.java b/modules/core/src/main/java/org/apache/ignite/cache/eviction/ggfs/GridCacheGgfsPerBlockLruEvictionPolicyMBean.java deleted file mode 100644 index 994a227..0000000 --- a/modules/core/src/main/java/org/apache/ignite/cache/eviction/ggfs/GridCacheGgfsPerBlockLruEvictionPolicyMBean.java +++ /dev/null @@ -1,93 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.cache.eviction.ggfs; - -import org.apache.ignite.mbean.*; -import org.jetbrains.annotations.*; - -import java.util.*; - -/** - * MBean for {@code GGFS per-block LRU} eviction policy. - */ -@IgniteMBeanDescription("MBean for GGFS per-block LRU cache eviction policy.") -public interface GridCacheGgfsPerBlockLruEvictionPolicyMBean { - /** - * Gets maximum allowed size of all blocks in bytes. - * - * @return Maximum allowed size of all blocks in bytes. - */ - @IgniteMBeanDescription("Maximum allowed size of all blocks in bytes.") - public long getMaxSize(); - - /** - * Sets maximum allowed size of data in all blocks in bytes. - * - * @param maxSize Maximum allowed size of data in all blocks in bytes. - */ - @IgniteMBeanDescription("Sets aximum allowed size of data in all blocks in bytes.") - public void setMaxSize(long maxSize); - - /** - * Gets maximum allowed amount of blocks. - * - * @return Maximum allowed amount of blocks. - */ - @IgniteMBeanDescription("Maximum allowed amount of blocks.") - public int getMaxBlocks(); - - /** - * Sets maximum allowed amount of blocks. - * - * @param maxBlocks Maximum allowed amount of blocks. - */ - @IgniteMBeanDescription("Sets maximum allowed amount of blocks.") - public void setMaxBlocks(int maxBlocks); - - /** - * Gets collection of regex for paths whose blocks must not be evicted. - * - * @return Collection of regex for paths whose blocks must not be evicted. - */ - @IgniteMBeanDescription("Collection of regex for paths whose blocks must not be evicted.") - @Nullable public Collection<String> getExcludePaths(); - - /** - * Sets collection of regex for paths whose blocks must not be evicted. - * - * @param excludePaths Collection of regex for paths whose blocks must not be evicted. - */ - @IgniteMBeanDescription("Sets collection of regex for paths whose blocks must not be evicted.") - public void setExcludePaths(@Nullable Collection<String> excludePaths); - - /** - * Gets current size of data in all blocks. - * - * @return Current size of data in all blocks. - */ - @IgniteMBeanDescription("Current size of data in all blocks.") - public long getCurrentSize(); - - /** - * Gets current amount of blocks. - * - * @return Current amount of blocks. - */ - @IgniteMBeanDescription("Current amount of blocks.") - public int getCurrentBlocks(); -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cafee25f/modules/core/src/main/java/org/apache/ignite/cache/eviction/lru/CacheLruEvictionPolicy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/eviction/lru/CacheLruEvictionPolicy.java b/modules/core/src/main/java/org/apache/ignite/cache/eviction/lru/CacheLruEvictionPolicy.java new file mode 100644 index 0000000..d85fd7a --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/cache/eviction/lru/CacheLruEvictionPolicy.java @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cache.eviction.lru; + +import org.apache.ignite.cache.*; +import org.apache.ignite.cache.eviction.*; +import org.jdk8.backport.*; +import org.jdk8.backport.ConcurrentLinkedDeque8.*; +import org.apache.ignite.internal.util.typedef.internal.*; + +import java.util.*; + +/** + * Eviction policy based on {@code Least Recently Used (LRU)} algorithm. This + * implementation is very efficient since it is lock-free and does not + * create any additional table-like data structures. The {@code LRU} ordering + * information is maintained by attaching ordering metadata to cache entries. + */ +public class CacheLruEvictionPolicy<K, V> implements CacheEvictionPolicy<K, V>, + CacheLruEvictionPolicyMBean { + /** Tag. */ + private final String meta = UUID.randomUUID().toString(); + + /** Maximum size. */ + private volatile int max = CacheConfiguration.DFLT_CACHE_SIZE; + + /** Queue. */ + private final ConcurrentLinkedDeque8<CacheEntry<K, V>> queue = + new ConcurrentLinkedDeque8<>(); + + /** + * Constructs LRU eviction policy with all defaults. + */ + public CacheLruEvictionPolicy() { + // No-op. + } + + /** + * Constructs LRU eviction policy with maximum size. + * + * @param max Maximum allowed size of cache before entry will start getting evicted. + */ + public CacheLruEvictionPolicy(int max) { + A.ensure(max > 0, "max > 0"); + + this.max = max; + } + + /** + * Gets maximum allowed size of cache before entry will start getting evicted. + * + * @return Maximum allowed size of cache before entry will start getting evicted. + */ + @Override public int getMaxSize() { + return max; + } + + /** + * Sets maximum allowed size of cache before entry will start getting evicted. + * + * @param max Maximum allowed size of cache before entry will start getting evicted. + */ + @Override public void setMaxSize(int max) { + A.ensure(max > 0, "max > 0"); + + this.max = max; + } + + /** {@inheritDoc} */ + @Override public int getCurrentSize() { + return queue.size(); + } + + /** {@inheritDoc} */ + @Override public String getMetaAttributeName() { + return meta; + } + + /** + * Gets read-only view on internal {@code FIFO} queue in proper order. + * + * @return Read-only view ono internal {@code 'FIFO'} queue. + */ + public Collection<CacheEntry<K, V>> queue() { + return Collections.unmodifiableCollection(queue); + } + + /** {@inheritDoc} */ + @Override public void onEntryAccessed(boolean rmv, CacheEntry<K, V> entry) { + if (!rmv) { + if (!entry.isCached()) + return; + + if (touch(entry)) + shrink(); + } + else { + Node<CacheEntry<K, V>> node = entry.removeMeta(meta); + + if (node != null) + queue.unlinkx(node); + } + } + + /** + * @param entry Entry to touch. + * @return {@code True} if new node has been added to queue by this call. + */ + private boolean touch(CacheEntry<K, V> entry) { + Node<CacheEntry<K, V>> node = entry.meta(meta); + + // Entry has not been enqueued yet. + if (node == null) { + while (true) { + node = queue.offerLastx(entry); + + if (entry.putMetaIfAbsent(meta, node) != null) { + // Was concurrently added, need to clear it from queue. + queue.unlinkx(node); + + // Queue has not been changed. + return false; + } + else if (node.item() != null) { + if (!entry.isCached()) { + // Was concurrently evicted, need to clear it from queue. + queue.unlinkx(node); + + return false; + } + + return true; + } + // If node was unlinked by concurrent shrink() call, we must repeat the whole cycle. + else if (!entry.removeMeta(meta, node)) + return false; + } + } + else if (queue.unlinkx(node)) { + // Move node to tail. + Node<CacheEntry<K, V>> newNode = queue.offerLastx(entry); + + if (!entry.replaceMeta(meta, node, newNode)) + // Was concurrently added, need to clear it from queue. + queue.unlinkx(newNode); + } + + // Entry is already in queue. + return false; + } + + /** + * Shrinks queue to maximum allowed size. + */ + private void shrink() { + int max = this.max; + + int startSize = queue.sizex(); + + for (int i = 0; i < startSize && queue.sizex() > max; i++) { + CacheEntry<K, V> entry = queue.poll(); + + if (entry == null) + break; + + if (!entry.evict()) { + entry.removeMeta(meta); + + touch(entry); + } + } + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(CacheLruEvictionPolicy.class, this, "size", queue.sizex()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cafee25f/modules/core/src/main/java/org/apache/ignite/cache/eviction/lru/CacheLruEvictionPolicyMBean.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/eviction/lru/CacheLruEvictionPolicyMBean.java b/modules/core/src/main/java/org/apache/ignite/cache/eviction/lru/CacheLruEvictionPolicyMBean.java new file mode 100644 index 0000000..8fbefae --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/cache/eviction/lru/CacheLruEvictionPolicyMBean.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cache.eviction.lru; + +import org.apache.ignite.mbean.*; + +/** + * MBean for {@code LRU} eviction policy. + */ +@IgniteMBeanDescription("MBean for LRU cache eviction policy.") +public interface CacheLruEvictionPolicyMBean { + /** + * Gets name of metadata attribute used to store eviction policy data. + * + * @return Name of metadata attribute used to store eviction policy data. + */ + @IgniteMBeanDescription("Name of metadata attribute used to store eviction policy data.") + public String getMetaAttributeName(); + + /** + * Gets maximum allowed cache size. + * + * @return Maximum allowed cache size. + */ + @IgniteMBeanDescription("Maximum allowed cache size.") + public int getMaxSize(); + + /** + * Sets maximum allowed cache size. + * + * @param max Maximum allowed cache size. + */ + @IgniteMBeanDescription("Sets maximum allowed cache size.") + public void setMaxSize(int max); + + /** + * Gets current queue size. + * + * @return Current queue size. + */ + @IgniteMBeanDescription("Current queue size.") + public int getCurrentSize(); +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cafee25f/modules/core/src/main/java/org/apache/ignite/cache/eviction/lru/GridCacheLruEvictionPolicy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/eviction/lru/GridCacheLruEvictionPolicy.java b/modules/core/src/main/java/org/apache/ignite/cache/eviction/lru/GridCacheLruEvictionPolicy.java deleted file mode 100644 index 73d5c40..0000000 --- a/modules/core/src/main/java/org/apache/ignite/cache/eviction/lru/GridCacheLruEvictionPolicy.java +++ /dev/null @@ -1,193 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.cache.eviction.lru; - -import org.apache.ignite.cache.*; -import org.apache.ignite.cache.eviction.*; -import org.jdk8.backport.*; -import org.jdk8.backport.ConcurrentLinkedDeque8.*; -import org.apache.ignite.internal.util.typedef.internal.*; - -import java.util.*; - -/** - * Eviction policy based on {@code Least Recently Used (LRU)} algorithm. This - * implementation is very efficient since it is lock-free and does not - * create any additional table-like data structures. The {@code LRU} ordering - * information is maintained by attaching ordering metadata to cache entries. - */ -public class GridCacheLruEvictionPolicy<K, V> implements GridCacheEvictionPolicy<K, V>, - GridCacheLruEvictionPolicyMBean { - /** Tag. */ - private final String meta = UUID.randomUUID().toString(); - - /** Maximum size. */ - private volatile int max = CacheConfiguration.DFLT_CACHE_SIZE; - - /** Queue. */ - private final ConcurrentLinkedDeque8<GridCacheEntry<K, V>> queue = - new ConcurrentLinkedDeque8<>(); - - /** - * Constructs LRU eviction policy with all defaults. - */ - public GridCacheLruEvictionPolicy() { - // No-op. - } - - /** - * Constructs LRU eviction policy with maximum size. - * - * @param max Maximum allowed size of cache before entry will start getting evicted. - */ - public GridCacheLruEvictionPolicy(int max) { - A.ensure(max > 0, "max > 0"); - - this.max = max; - } - - /** - * Gets maximum allowed size of cache before entry will start getting evicted. - * - * @return Maximum allowed size of cache before entry will start getting evicted. - */ - @Override public int getMaxSize() { - return max; - } - - /** - * Sets maximum allowed size of cache before entry will start getting evicted. - * - * @param max Maximum allowed size of cache before entry will start getting evicted. - */ - @Override public void setMaxSize(int max) { - A.ensure(max > 0, "max > 0"); - - this.max = max; - } - - /** {@inheritDoc} */ - @Override public int getCurrentSize() { - return queue.size(); - } - - /** {@inheritDoc} */ - @Override public String getMetaAttributeName() { - return meta; - } - - /** - * Gets read-only view on internal {@code FIFO} queue in proper order. - * - * @return Read-only view ono internal {@code 'FIFO'} queue. - */ - public Collection<GridCacheEntry<K, V>> queue() { - return Collections.unmodifiableCollection(queue); - } - - /** {@inheritDoc} */ - @Override public void onEntryAccessed(boolean rmv, GridCacheEntry<K, V> entry) { - if (!rmv) { - if (!entry.isCached()) - return; - - if (touch(entry)) - shrink(); - } - else { - Node<GridCacheEntry<K, V>> node = entry.removeMeta(meta); - - if (node != null) - queue.unlinkx(node); - } - } - - /** - * @param entry Entry to touch. - * @return {@code True} if new node has been added to queue by this call. - */ - private boolean touch(GridCacheEntry<K, V> entry) { - Node<GridCacheEntry<K, V>> node = entry.meta(meta); - - // Entry has not been enqueued yet. - if (node == null) { - while (true) { - node = queue.offerLastx(entry); - - if (entry.putMetaIfAbsent(meta, node) != null) { - // Was concurrently added, need to clear it from queue. - queue.unlinkx(node); - - // Queue has not been changed. - return false; - } - else if (node.item() != null) { - if (!entry.isCached()) { - // Was concurrently evicted, need to clear it from queue. - queue.unlinkx(node); - - return false; - } - - return true; - } - // If node was unlinked by concurrent shrink() call, we must repeat the whole cycle. - else if (!entry.removeMeta(meta, node)) - return false; - } - } - else if (queue.unlinkx(node)) { - // Move node to tail. - Node<GridCacheEntry<K, V>> newNode = queue.offerLastx(entry); - - if (!entry.replaceMeta(meta, node, newNode)) - // Was concurrently added, need to clear it from queue. - queue.unlinkx(newNode); - } - - // Entry is already in queue. - return false; - } - - /** - * Shrinks queue to maximum allowed size. - */ - private void shrink() { - int max = this.max; - - int startSize = queue.sizex(); - - for (int i = 0; i < startSize && queue.sizex() > max; i++) { - GridCacheEntry<K, V> entry = queue.poll(); - - if (entry == null) - break; - - if (!entry.evict()) { - entry.removeMeta(meta); - - touch(entry); - } - } - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(GridCacheLruEvictionPolicy.class, this, "size", queue.sizex()); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cafee25f/modules/core/src/main/java/org/apache/ignite/cache/eviction/lru/GridCacheLruEvictionPolicyMBean.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/eviction/lru/GridCacheLruEvictionPolicyMBean.java b/modules/core/src/main/java/org/apache/ignite/cache/eviction/lru/GridCacheLruEvictionPolicyMBean.java deleted file mode 100644 index d23edc9..0000000 --- a/modules/core/src/main/java/org/apache/ignite/cache/eviction/lru/GridCacheLruEvictionPolicyMBean.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.cache.eviction.lru; - -import org.apache.ignite.mbean.*; - -/** - * MBean for {@code LRU} eviction policy. - */ -@IgniteMBeanDescription("MBean for LRU cache eviction policy.") -public interface GridCacheLruEvictionPolicyMBean { - /** - * Gets name of metadata attribute used to store eviction policy data. - * - * @return Name of metadata attribute used to store eviction policy data. - */ - @IgniteMBeanDescription("Name of metadata attribute used to store eviction policy data.") - public String getMetaAttributeName(); - - /** - * Gets maximum allowed cache size. - * - * @return Maximum allowed cache size. - */ - @IgniteMBeanDescription("Maximum allowed cache size.") - public int getMaxSize(); - - /** - * Sets maximum allowed cache size. - * - * @param max Maximum allowed cache size. - */ - @IgniteMBeanDescription("Sets maximum allowed cache size.") - public void setMaxSize(int max); - - /** - * Gets current queue size. - * - * @return Current queue size. - */ - @IgniteMBeanDescription("Current queue size.") - public int getCurrentSize(); -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cafee25f/modules/core/src/main/java/org/apache/ignite/cache/eviction/random/CacheRandomEvictionPolicy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/eviction/random/CacheRandomEvictionPolicy.java b/modules/core/src/main/java/org/apache/ignite/cache/eviction/random/CacheRandomEvictionPolicy.java new file mode 100644 index 0000000..4b57f9e --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/cache/eviction/random/CacheRandomEvictionPolicy.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cache.eviction.random; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.cache.eviction.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; + +/** + * Cache eviction policy which will select random cache entry for eviction if cache + * size exceeds the {@link #getMaxSize()} parameter. This implementation is + * extremely light weight, lock-free, and does not create any data structures to maintain + * any order for eviction. + * <p> + * Random eviction will provide the best performance over any key set in which every + * key has the same probability of being accessed. + */ +public class CacheRandomEvictionPolicy<K, V> implements CacheEvictionPolicy<K, V>, + CacheRandomEvictionPolicyMBean { + /** Maximum size. */ + private volatile int max = CacheConfiguration.DFLT_CACHE_SIZE; + + /** + * Constructs random eviction policy with all defaults. + */ + public CacheRandomEvictionPolicy() { + // No-op. + } + + /** + * Constructs random eviction policy with maximum size. + * + * @param max Maximum allowed size of cache before entry will start getting evicted. + */ + public CacheRandomEvictionPolicy(int max) { + A.ensure(max > 0, "max > 0"); + + this.max = max; + } + + /** + * Gets maximum allowed size of cache before entry will start getting evicted. + * + * @return Maximum allowed size of cache before entry will start getting evicted. + */ + @Override public int getMaxSize() { + return max; + } + + /** + * Sets maximum allowed size of cache before entry will start getting evicted. + * + * @param max Maximum allowed size of cache before entry will start getting evicted. + */ + @Override public void setMaxSize(int max) { + A.ensure(max > 0, "max > 0"); + + this.max = max; + } + + /** {@inheritDoc} */ + @Override public void onEntryAccessed(boolean rmv, CacheEntry<K, V> entry) { + if (!entry.isCached()) + return; + + Cache<K, V> cache = entry.projection().cache(); + + int size = cache.size(); + + for (int i = max; i < size; i++) { + CacheEntry<K, V> e = cache.randomEntry(); + + if (e != null) + e.evict(); + } + } + + /** + * Checks entry for empty value. + * + * @param entry Entry to check. + * @return {@code True} if entry is empty. + */ + private boolean empty(CacheEntry<K, V> entry) { + try { + return entry.peek(F.asList(GridCachePeekMode.GLOBAL)) == null; + } + catch (IgniteCheckedException e) { + U.error(null, e.getMessage(), e); + + assert false : "Should never happen: " + e; + + return false; + } + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(CacheRandomEvictionPolicy.class, this); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cafee25f/modules/core/src/main/java/org/apache/ignite/cache/eviction/random/CacheRandomEvictionPolicyMBean.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/eviction/random/CacheRandomEvictionPolicyMBean.java b/modules/core/src/main/java/org/apache/ignite/cache/eviction/random/CacheRandomEvictionPolicyMBean.java new file mode 100644 index 0000000..aaf4e4d --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/cache/eviction/random/CacheRandomEvictionPolicyMBean.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cache.eviction.random; + +import org.apache.ignite.mbean.*; + +/** + * MBean for {@code random} eviction policy. + */ +@IgniteMBeanDescription("MBean for random cache eviction policy.") +public interface CacheRandomEvictionPolicyMBean { + /** + * Gets maximum allowed cache size. + * + * @return Maximum allowed cache size. + */ + @IgniteMBeanDescription("Maximum allowed cache size.") + public int getMaxSize(); + + /** + * Sets maximum allowed cache size. + * + * @param max Maximum allowed cache size. + */ + @IgniteMBeanDescription("Sets maximum allowed cache size.") + public void setMaxSize(int max); +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cafee25f/modules/core/src/main/java/org/apache/ignite/cache/eviction/random/GridCacheRandomEvictionPolicy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/eviction/random/GridCacheRandomEvictionPolicy.java b/modules/core/src/main/java/org/apache/ignite/cache/eviction/random/GridCacheRandomEvictionPolicy.java deleted file mode 100644 index 376171e..0000000 --- a/modules/core/src/main/java/org/apache/ignite/cache/eviction/random/GridCacheRandomEvictionPolicy.java +++ /dev/null @@ -1,118 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.cache.eviction.random; - -import org.apache.ignite.*; -import org.apache.ignite.cache.*; -import org.apache.ignite.cache.eviction.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; - -/** - * Cache eviction policy which will select random cache entry for eviction if cache - * size exceeds the {@link #getMaxSize()} parameter. This implementation is - * extremely light weight, lock-free, and does not create any data structures to maintain - * any order for eviction. - * <p> - * Random eviction will provide the best performance over any key set in which every - * key has the same probability of being accessed. - */ -public class GridCacheRandomEvictionPolicy<K, V> implements GridCacheEvictionPolicy<K, V>, - GridCacheRandomEvictionPolicyMBean { - /** Maximum size. */ - private volatile int max = CacheConfiguration.DFLT_CACHE_SIZE; - - /** - * Constructs random eviction policy with all defaults. - */ - public GridCacheRandomEvictionPolicy() { - // No-op. - } - - /** - * Constructs random eviction policy with maximum size. - * - * @param max Maximum allowed size of cache before entry will start getting evicted. - */ - public GridCacheRandomEvictionPolicy(int max) { - A.ensure(max > 0, "max > 0"); - - this.max = max; - } - - /** - * Gets maximum allowed size of cache before entry will start getting evicted. - * - * @return Maximum allowed size of cache before entry will start getting evicted. - */ - @Override public int getMaxSize() { - return max; - } - - /** - * Sets maximum allowed size of cache before entry will start getting evicted. - * - * @param max Maximum allowed size of cache before entry will start getting evicted. - */ - @Override public void setMaxSize(int max) { - A.ensure(max > 0, "max > 0"); - - this.max = max; - } - - /** {@inheritDoc} */ - @Override public void onEntryAccessed(boolean rmv, GridCacheEntry<K, V> entry) { - if (!entry.isCached()) - return; - - GridCache<K, V> cache = entry.projection().cache(); - - int size = cache.size(); - - for (int i = max; i < size; i++) { - GridCacheEntry<K, V> e = cache.randomEntry(); - - if (e != null) - e.evict(); - } - } - - /** - * Checks entry for empty value. - * - * @param entry Entry to check. - * @return {@code True} if entry is empty. - */ - private boolean empty(GridCacheEntry<K, V> entry) { - try { - return entry.peek(F.asList(GridCachePeekMode.GLOBAL)) == null; - } - catch (IgniteCheckedException e) { - U.error(null, e.getMessage(), e); - - assert false : "Should never happen: " + e; - - return false; - } - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(GridCacheRandomEvictionPolicy.class, this); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cafee25f/modules/core/src/main/java/org/apache/ignite/cache/eviction/random/GridCacheRandomEvictionPolicyMBean.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/eviction/random/GridCacheRandomEvictionPolicyMBean.java b/modules/core/src/main/java/org/apache/ignite/cache/eviction/random/GridCacheRandomEvictionPolicyMBean.java deleted file mode 100644 index bcf9496..0000000 --- a/modules/core/src/main/java/org/apache/ignite/cache/eviction/random/GridCacheRandomEvictionPolicyMBean.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.cache.eviction.random; - -import org.apache.ignite.mbean.*; - -/** - * MBean for {@code random} eviction policy. - */ -@IgniteMBeanDescription("MBean for random cache eviction policy.") -public interface GridCacheRandomEvictionPolicyMBean { - /** - * Gets maximum allowed cache size. - * - * @return Maximum allowed cache size. - */ - @IgniteMBeanDescription("Maximum allowed cache size.") - public int getMaxSize(); - - /** - * Sets maximum allowed cache size. - * - * @param max Maximum allowed cache size. - */ - @IgniteMBeanDescription("Sets maximum allowed cache size.") - public void setMaxSize(int max); -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cafee25f/modules/core/src/main/java/org/apache/ignite/cache/query/CacheContinuousQuery.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/query/CacheContinuousQuery.java b/modules/core/src/main/java/org/apache/ignite/cache/query/CacheContinuousQuery.java new file mode 100644 index 0000000..eb5d265 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/cache/query/CacheContinuousQuery.java @@ -0,0 +1,341 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cache.query; + +import org.apache.ignite.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.lang.*; +import org.jetbrains.annotations.*; + +import java.util.*; + +/** + * API for configuring and executing continuous cache queries. + * <p> + * Continuous queries are executed as follows: + * <ol> + * <li> + * Query is sent to requested grid nodes. Note that for {@link org.apache.ignite.cache.CacheMode#LOCAL LOCAL} + * and {@link org.apache.ignite.cache.CacheMode#REPLICATED REPLICATED} caches query will be always executed + * locally. + * </li> + * <li> + * Each node iterates through existing cache data and registers listeners that will + * notify about further updates. + * <li> + * Each key-value pair is passed through optional filter and if this filter returns + * true, key-value pair is sent to the master node (the one that executed query). + * If filter is not provided, all pairs are sent. + * </li> + * <li> + * When master node receives key-value pairs, it notifies the local callback. + * </li> + * </ol> + * <h2 class="header">NOTE</h2> + * Under some concurrent circumstances callback may get several notifications + * for one cache update. This should be taken into account when implementing callback. + * <h1 class="header">Query usage</h1> + * As an example, suppose we have cache with {@code 'Person'} objects and we need + * to query all persons with salary above 1000. + * <p> + * Here is the {@code Person} class: + * <pre name="code" class="java"> + * public class Person { + * // Name. + * private String name; + * + * // Salary. + * private double salary; + * + * ... + * } + * </pre> + * <p> + * You can create and execute continuous query like so: + * <pre name="code" class="java"> + * // Create new continuous query. + * qry = cache.createContinuousQuery(); + * + * // Callback that is called locally when update notifications are received. + * // It simply prints out information about all created persons. + * qry.callback(new GridPredicate2<UUID, Collection<Map.Entry<UUID, Person>>>() { + * @Override public boolean apply(UUID uuid, Collection<Map.Entry<UUID, Person>> entries) { + * for (Map.Entry<UUID, Person> e : entries) { + * Person p = e.getValue(); + * + * X.println(">>>"); + * X.println(">>> " + p.getFirstName() + " " + p.getLastName() + + * "'s salary is " + p.getSalary()); + * X.println(">>>"); + * } + * + * return true; + * } + * }); + * + * // This query will return persons with salary above 1000. + * qry.filter(new GridPredicate2<UUID, Person>() { + * @Override public boolean apply(UUID uuid, Person person) { + * return person.getSalary() > 1000; + * } + * }); + * + * // Execute query. + * qry.execute(); + * </pre> + * This will execute query on all nodes that have cache you are working with and notify callback + * with both data that already exists in cache and further updates. + * <p> + * To stop receiving updates call {@link #close()} method: + * <pre name="code" class="java"> + * qry.cancel(); + * </pre> + * Note that one query instance can be executed only once. After it's cancelled, it's non-operational. + * If you need to repeat execution, use {@link CacheQueries#createContinuousQuery()} method to create + * new query. + */ +public interface CacheContinuousQuery<K, V> extends AutoCloseable { + /** + * Default buffer size. Size of {@code 1} means that all entries + * will be sent to master node immediately (buffering is disabled). + */ + public static final int DFLT_BUF_SIZE = 1; + + /** Maximum default time interval after which buffer will be flushed (if buffering is enabled). */ + public static final long DFLT_TIME_INTERVAL = 0; + + /** + * Default value for automatic unsubscription flag. Remote filters + * will be unregistered by default if master node leaves topology. + */ + public static final boolean DFLT_AUTO_UNSUBSCRIBE = true; + + /** + * Sets local callback. This callback is called only + * in local node when new updates are received. + * <p> + * The callback predicate accepts ID of the node from where updates + * are received and collection of received entries. Note that + * for removed entries value will be {@code null}. + * <p> + * If the predicate returns {@code false}, query execution will + * be cancelled. + * <p> + * <b>WARNING:</b> all operations that involve any kind of JVM-local + * or distributed locking (e.g., synchronization or transactional + * cache operations), should be executed asynchronously without + * blocking the thread that called the callback. Otherwise, you + * can get deadlocks. + * + * @param cb Local callback. + * @deprecated Deprecated in favor of {@link #localCallback(IgniteBiPredicate)} method. + */ + @Deprecated + public void callback(@Nullable IgniteBiPredicate<UUID, Collection<Map.Entry<K, V>>> cb); + + /** + * Gets local callback. See {@link #callback(IgniteBiPredicate)} for more information. + * + * @return Local callback. + * @deprecated Deprecated in favor of {@link #localCallback()} method. + */ + @Deprecated + public IgniteBiPredicate<UUID, Collection<Map.Entry<K, V>>> callback(); + + /** + * Sets optional key-value filter. This filter is called before + * entry is sent to the master node. + * <p> + * <b>WARNING:</b> all operations that involve any kind of JVM-local + * or distributed locking (e.g., synchronization or transactional + * cache operations), should be executed asynchronously without + * blocking the thread that called the filter. Otherwise, you + * can get deadlocks. + * + * @param filter Key-value filter. + * @deprecated Deprecated in favor of {@link #remoteFilter(org.apache.ignite.lang.IgnitePredicate)} method. + */ + @Deprecated + public void filter(@Nullable IgniteBiPredicate<K, V> filter); + + /** + * Gets key-value filter. See {@link #filter(IgniteBiPredicate)} for more information. + * + * @return Key-value filter. + * @deprecated Deprecated in favor of {@link #remoteFilter()} method. + */ + @Deprecated + @Nullable public IgniteBiPredicate<K, V> filter(); + + /** + * Sets local callback. This callback is called only + * in local node when new updates are received. + * <p> + * The callback predicate accepts ID of the node from where updates + * are received and collection of received entries. Note that + * for removed entries value will be {@code null}. + * <p> + * If the predicate returns {@code false}, query execution will + * be cancelled. + * <p> + * <b>WARNING:</b> all operations that involve any kind of JVM-local + * or distributed locking (e.g., synchronization or transactional + * cache operations), should be executed asynchronously without + * blocking the thread that called the callback. Otherwise, you + * can get deadlocks. + * + * @param locCb Local callback. + */ + public void localCallback(IgniteBiPredicate<UUID, Collection<CacheContinuousQueryEntry<K, V>>> locCb); + + /** + * Gets local callback. See {@link #callback(IgniteBiPredicate)} for more information. + * + * @return Local callback. + */ + @Nullable public IgniteBiPredicate<UUID, Collection<CacheContinuousQueryEntry<K, V>>> localCallback(); + + /** + * Sets optional key-value filter. This filter is called before + * entry is sent to the master node. + * <p> + * <b>WARNING:</b> all operations that involve any kind of JVM-local + * or distributed locking (e.g., synchronization or transactional + * cache operations), should be executed asynchronously without + * blocking the thread that called the filter. Otherwise, you + * can get deadlocks. + * + * @param filter Key-value filter. + */ + public void remoteFilter(@Nullable IgnitePredicate<CacheContinuousQueryEntry<K, V>> filter); + + /** + * Gets key-value filter. See {@link #filter(IgniteBiPredicate)} for more information. + * + * @return Key-value filter. + */ + @Nullable public IgnitePredicate<CacheContinuousQueryEntry<K, V>> remoteFilter(); + + /** + * Sets buffer size. + * <p> + * When a cache update happens, entry is first put into a buffer. + * Entries from buffer will be sent to the master node only if + * the buffer is full or time provided via {@link #timeInterval(long)} + * method is exceeded. + * <p> + * Default buffer size is {@code 1} which means that entries will + * be sent immediately (buffering is disabled). + * + * @param bufSize Buffer size. + */ + public void bufferSize(int bufSize); + + /** + * Gets buffer size. See {@link #bufferSize(int)} for more information. + * + * @return Buffer size. + */ + public int bufferSize(); + + /** + * Sets time interval. + * <p> + * When a cache update happens, entry is first put into a buffer. + * Entries from buffer will be sent to the master node only if + * the buffer is full (its size can be provided via {@link #bufferSize(int)} + * method) or time provided via this method is exceeded. + * <p> + * Default time interval is {@code 0} which means that time check is + * disabled and entries will be sent only when buffer is full. + * + * @param timeInterval Time interval. + */ + public void timeInterval(long timeInterval); + + /** + * Gets time interval. See {@link #timeInterval(long)} for more information. + * + * @return Gets time interval. + */ + public long timeInterval(); + + /** + * Sets automatic unsubscribe flag. + * <p> + * This flag indicates that query filters on remote nodes should be automatically + * unregistered if master node (node that initiated the query) leaves topology. + * If this flag is {@code false}, filters will be unregistered only when + * the query is cancelled from master node, and won't ever be unregistered if + * master node leaves grid. + * <p> + * Default value for this flag is {@code true}. + * + * @param autoUnsubscribe Automatic unsubscription flag. + */ + public void autoUnsubscribe(boolean autoUnsubscribe); + + /** + * Gets automatic unsubscribe flag. See {@link #autoUnsubscribe(boolean)} + * for more information. + * + * @return Automatic unsubscribe flag. + */ + public boolean isAutoUnsubscribe(); + + /** + * Starts continuous query execution on the whole grid. + * <p> + * Note that if grid contains nodes without appropriate cache, + * these nodes will be filtered out. + * <p> + * Also note that for {@link org.apache.ignite.cache.CacheMode#LOCAL LOCAL} + * and {@link org.apache.ignite.cache.CacheMode#REPLICATED REPLICATED} caches + * query will be always executed locally. + * + * @throws IgniteCheckedException In case of error. + */ + public void execute() throws IgniteCheckedException; + + /** + * Starts continuous query execution on provided set of nodes. + * <p> + * Note that if provided projection contains nodes without + * appropriate cache, these nodes will be filtered out. + * <p> + * Also note that for {@link org.apache.ignite.cache.CacheMode#LOCAL LOCAL} + * and {@link org.apache.ignite.cache.CacheMode#REPLICATED REPLICATED} caches + * query will be always executed locally. + * + * @param prj Grid projection. + * @throws IgniteCheckedException In case of error. + */ + public void execute(@Nullable ClusterGroup prj) throws IgniteCheckedException; + + /** + * Stops continuous query execution. + * <p> + * Note that one query instance can be executed only once. + * After it's cancelled, it's non-operational. + * If you need to repeat execution, use {@link CacheQueries#createContinuousQuery()} + * method to create new query. + * + * @throws IgniteCheckedException In case of error. + */ + @Override public void close() throws IgniteCheckedException; +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cafee25f/modules/core/src/main/java/org/apache/ignite/cache/query/CacheContinuousQueryEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/query/CacheContinuousQueryEntry.java b/modules/core/src/main/java/org/apache/ignite/cache/query/CacheContinuousQueryEntry.java new file mode 100644 index 0000000..90d3602 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/cache/query/CacheContinuousQueryEntry.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cache.query; + +import org.jetbrains.annotations.*; + +import java.io.*; +import java.util.*; + +/** + * Entry used for continuous query notifications. + */ +public interface CacheContinuousQueryEntry<K, V> extends Map.Entry<K, V>, Serializable { + /** + * Gets entry key. + * + * @return Entry key. + */ + @Override public K getKey(); + + /** + * Gets entry new value. New value may be null, if entry is being removed. + * + * @return Entry new value. + */ + @Override @Nullable public V getValue(); + + /** + * Gets entry old value. Old value may be null if entry is being inserted (not updated). + * + * @return Gets entry old value. + */ + @Nullable public V getOldValue(); +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cafee25f/modules/core/src/main/java/org/apache/ignite/cache/query/CacheQueries.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/query/CacheQueries.java b/modules/core/src/main/java/org/apache/ignite/cache/query/CacheQueries.java new file mode 100644 index 0000000..1cc59bb --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/cache/query/CacheQueries.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cache.query; + +import org.apache.ignite.lang.*; +import org.jetbrains.annotations.*; + +import java.util.*; + +/** + * Facade for creating distributed queries. It contains various {@code 'createXxxQuery(..)'} + * methods for {@code SQL}, {@code TEXT}, and {@code SCAN} query creation (see {@link CacheQuery} + * for more information). + * <p> + * Instance of {@code CacheQueries} is obtained from cache projection as follows: + * <pre name="code" class="java"> + * CacheQueries q = GridGain.grid().cache("myCache").queries(); + * </pre> + */ +public interface CacheQueries<K, V> { + /** + * Creates user's SQL query, queried class, and query clause which is generally + * a where clause. For more information refer to {@link CacheQuery} documentation. + * + * @param cls Query class. + * @param clause Query clause. + * @return Created query. + */ + public CacheQuery<Map.Entry<K, V>> createSqlQuery(Class<?> cls, String clause); + + /** + * Creates user's SQL query, queried class, and query clause which is generally + * a where clause. For more information refer to {@link CacheQuery} documentation. + * + * @param clsName Query class name. + * @param clause Query clause. + * @return Created query. + */ + public CacheQuery<Map.Entry<K, V>> createSqlQuery(String clsName, String clause); + + /** + * Creates user's SQL fields query for given clause. For more information refer to + * {@link CacheQuery} documentation. + * + * @param qry Query. + * @return Created query. + */ + public CacheQuery<List<?>> createSqlFieldsQuery(String qry); + + /** + * Creates user's full text query, queried class, and query clause. + * For more information refer to {@link CacheQuery} documentation. + * + * @param clsName Query class name. + * @param search Search clause. + * @return Created query. + */ + public CacheQuery<Map.Entry<K, V>> createFullTextQuery(String clsName, String search); + + /** + * Creates user's full text query, queried class, and query clause. + * For more information refer to {@link CacheQuery} documentation. + * + * @param cls Query class. + * @param search Search clause. + * @return Created query. + */ + public CacheQuery<Map.Entry<K, V>> createFullTextQuery(Class<?> cls, String search); + + /** + * Creates user's predicate based scan query. + * + * @param filter Scan filter. + * @return Created query. + */ + public CacheQuery<Map.Entry<K, V>> createScanQuery(@Nullable IgniteBiPredicate<K, V> filter); + + /** + * Creates new continuous query. + * <p> + * For more information refer to {@link CacheContinuousQuery} documentation. + * + * @return Created continuous query. + * @see CacheContinuousQuery + */ + public CacheContinuousQuery<K, V> createContinuousQuery(); + + /** + * Forces this cache to rebuild all search indexes of given value type. Sometimes indexes + * may hold references to objects that have already been removed from cache. Although + * not affecting query results, these objects may consume extra memory. Rebuilding + * indexes will remove any redundant references that may have temporarily got stuck + * inside in-memory index. + * + * @param cls Value type to rebuild indexes for. + * + * @return Future that will be completed when rebuilding of all indexes is finished. + */ + public IgniteFuture<?> rebuildIndexes(Class<?> cls); + + /** + * Forces this cache to rebuild all search indexes of given value type. Sometimes indexes + * may hold references to objects that have already been removed from cache. Although + * not affecting query results, these objects may consume extra memory. Rebuilding + * indexes will remove any redundant references that may have temporarily got stuck + * inside in-memory index. + * + * @param typeName Value type name to rebuild indexes for. + * + * @return Future that will be completed when rebuilding of all indexes is finished. + */ + public IgniteFuture<?> rebuildIndexes(String typeName); + + /** + * Forces this cache to rebuild search indexes of all types. Sometimes indexes + * may hold references to objects that have already been removed from cache. Although + * not affecting query results, these objects may consume extra memory. Rebuilding + * indexes will remove any redundant references that may have temporarily got stuck + * inside in-memory index. + * + * @return Future that will be completed when rebuilding of all indexes is finished. + */ + public IgniteFuture<?> rebuildAllIndexes(); + + /** + * Accumulated metrics for all queries executed for this cache. + * + * @return Cache query metrics. + */ + public CacheQueryMetrics metrics(); + + /** + * Resets accumulated metrics. + */ + public void resetMetrics(); +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cafee25f/modules/core/src/main/java/org/apache/ignite/cache/query/CacheQuery.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/query/CacheQuery.java b/modules/core/src/main/java/org/apache/ignite/cache/query/CacheQuery.java new file mode 100644 index 0000000..4be4f95 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/cache/query/CacheQuery.java @@ -0,0 +1,295 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cache.query; + +import org.apache.ignite.cluster.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.lang.*; +import org.jetbrains.annotations.*; + +/** + * Main API for configuring and executing cache queries. + * <p> + * Cache queries are created from {@link CacheQueries} API via any of the available + * {@code createXXXQuery(...)} methods. + * <h1 class="header">SQL Queries</h1> + * {@code SQL} query allows to execute distributed cache + * queries using standard SQL syntax. All values participating in where clauses + * or joins must be annotated with {@link CacheQuerySqlField} annotation. Query can be created + * with {@link CacheQueries#createSqlQuery(Class, String)} method. + * <h2 class="header">Field Queries</h2> + * By default {@code select} clause is ignored as query result contains full objects. + * If it is needed to select individual fields, use {@link CacheQueries#createSqlFieldsQuery(String)} method. + * This type of query replaces full objects with individual fields. Note that selected fields + * must be annotated with {@link CacheQuerySqlField} annotation. + * <h2 class="header">Cross-Cache Queries</h2> + * You are allowed to query data from several caches. Cache that this query was created on is + * treated as default schema in this case. Other caches can be referenced by their names. + * <p> + * Note that cache name is case sensitive and has to always be specified in double quotes. + * Here is an example of cross cache query (note that 'replicated' and 'partitioned' are + * cache names for replicated and partitioned caches accordingly): + * <pre name="code" class="java"> + * CacheQuery<Map.Entry<Integer, FactPurchase>> storePurchases = cache.queries().createSqlQuery( + * Purchase.class, + * "from \"replicated\".Store, \"partitioned\".Purchase where Store.id=Purchase.storeId and Store.id=?"); + * </pre> + * <h2 class="header">Custom functions in SQL queries.</h2> + * It is possible to write custom Java methods and call then form SQL queries. These methods must be public static + * and annotated with {@link CacheQuerySqlFunction}. Classes containing these methods must be registered in + * {@link GridQueryConfiguration#setIndexCustomFunctionClasses(Class[])}. + * <h1 class="header">Full Text Queries</h1> + * GridGain supports full text queries based on Apache Lucene engine. This queries are created by + * {@link CacheQueries#createFullTextQuery(Class, String)} method. Note that all fields that + * are expected to show up in text query results must be annotated with {@link CacheQueryTextField} + * annotation. + * <h1 class="header">Scan Queries</h1> + * Sometimes when it is known in advance that SQL query will cause a full data scan, or whenever data set + * is relatively small, the full scan query may be used. This query will iterate over all cache + * entries, skipping over entries that don't pass the optionally provided key-value filter + * (see {@link CacheQueries#createScanQuery(org.apache.ignite.lang.IgniteBiPredicate)} method). + * <h2 class="header">Limitations</h2> + * Data in GridGain cache is usually distributed across several nodes, + * so some queries may not work as expected. Keep in mind following limitations + * (not applied if data is queried from one node only): + * <ul> + * <li> + * {@code Group by} and {@code sort by} statements are applied separately + * on each node, so result set will likely be incorrectly grouped or sorted + * after results from multiple remote nodes are grouped together. + * </li> + * <li> + * Aggregation functions like {@code sum}, {@code max}, {@code avg}, etc. + * are also applied on each node. Therefore you will get several results + * containing aggregated values, one for each node. + * </li> + * <li> + * Joins will work correctly only if joined objects are stored in + * collocated mode or at least one side of the join is stored in + * {@link org.apache.ignite.cache.CacheMode#REPLICATED} cache. Refer to + * {@link org.apache.ignite.cache.affinity.CacheAffinityKey} javadoc for more information about colocation. + * </li> + * </ul> + * <h1 class="header">Query usage</h1> + * As an example, suppose we have data model consisting of {@code 'Employee'} and {@code 'Organization'} + * classes defined as follows: + * <pre name="code" class="java"> + * public class Organization { + * // Indexed field. + * @CacheQuerySqlField(index = true) + * private long id; + * + * // Indexed field. + * @CacheQuerySqlField(index = true) + * private String name; + * ... + * } + * + * public class Person { + * // Indexed field. + * @CacheQuerySqlField(index = true) + * private long id; + * + * // Indexed field (Organization ID, used as a foreign key). + * @CacheQuerySqlField(index = true) + * private long orgId; + * + * // Without SQL field annotation, this field cannot be used in queries. + * private String name; + * + * // Not indexed field. + * @CacheQuerySqlField + * private double salary; + * + * // Index for text search. + * @CacheQueryTextField + * private String resume; + * ... + * } + * </pre> + * Then you can create and execute queries that check various salary ranges like so: + * <pre name="code" class="java"> + * Cache<Long, Person> cache = G.grid().cache(); + * ... + * // Create query which selects salaries based on range for all employees + * // that work for a certain company. + * CacheQuery<Map.Entry<Long, Person>> qry = cache.queries().createSqlQuery(Person.class, + * "from Person, Organization where Person.orgId = Organization.id " + + * "and Organization.name = ? and Person.salary > ? and Person.salary <= ?"); + * + * // Query all nodes to find all cached GridGain employees + * // with salaries less than 1000. + * qry.execute("GridGain", 0, 1000); + * + * // Query only remote nodes to find all remotely cached GridGain employees + * // with salaries greater than 1000 and less than 2000. + * qry.projection(grid.remoteProjection()).execute("GridGain", 1000, 2000); + * </pre> + * Here is a possible query that will use Lucene text search to scan all resumes to + * check if employees have {@code Master} degree: + * <pre name="code" class="java"> + * CacheQuery<Map.Entry<Long, Person>> mastersQry = + * cache.queries().createFullTextQuery(Person.class, "Master"); + * + * // Query all cache nodes. + * mastersQry.execute(); + * </pre> + * <h1 class="header">Geo-Spatial Indexes and Queries</h1> + * GridGain also support <b>Geo-Spatial Indexes</b>. Here is an example of geo-spatial index: + * <pre name="code" class="java"> + * private class MapPoint implements Serializable { + * // Geospatial index. + * @CacheQuerySqlField(index = true) + * private com.vividsolutions.jts.geom.Point location; + * + * // Not indexed field. + * @CacheQuerySqlField + * private String name; + * + * public MapPoint(com.vividsolutions.jts.geom.Point location, String name) { + * this.location = location; + * this.name = name; + * } + * } + * </pre> + * Example of spatial query on the geo-indexed field from above: + * <pre name="code" class="java"> + * com.vividsolutions.jts.geom.GeometryFactory factory = new com.vividsolutions.jts.geom.GeometryFactory(); + * + * com.vividsolutions.jts.geom.Polygon square = factory.createPolygon(new Coordinate[] { + * new com.vividsolutions.jts.geom.Coordinate(0, 0), + * new com.vividsolutions.jts.geom.Coordinate(0, 100), + * new com.vividsolutions.jts.geom.Coordinate(100, 100), + * new com.vividsolutions.jts.geom.Coordinate(100, 0), + * new com.vividsolutions.jts.geom.Coordinate(0, 0) + * }); + * + * Map.Entry<String, UserData> records = cache.queries().createSqlQuery(MapPoint.class, "select * from MapPoint where location && ?") + * .queryArguments(square) + * .execute() + * .get(); + * </pre> + */ +public interface CacheQuery<T> { + /** Default query page size. */ + public static final int DFLT_PAGE_SIZE = 1024; + + /** + * Sets result page size. If not provided, {@link #DFLT_PAGE_SIZE} will be used. + * Results are returned from queried nodes one page at a tme. + * + * @param pageSize Page size. + * @return {@code this} query instance for chaining. + */ + public CacheQuery<T> pageSize(int pageSize); + + /** + * Sets query timeout. {@code 0} means there is no timeout (this + * is a default value). + * + * @param timeout Query timeout. + * @return {@code this} query instance for chaining. + */ + public CacheQuery<T> timeout(long timeout); + + /** + * Sets whether or not to keep all query results local. If not - only the current page + * is kept locally. Default value is {@code true}. + * + * @param keepAll Keep results or not. + * @return {@code this} query instance for chaining. + */ + public CacheQuery<T> keepAll(boolean keepAll); + + /** + * Sets whether or not to include backup entries into query result. This flag + * is {@code false} by default. + * + * @param incBackups Query {@code includeBackups} flag. + * @return {@code this} query instance for chaining. + */ + public CacheQuery<T> includeBackups(boolean incBackups); + + /** + * Sets whether or not to deduplicate query result set. If this flag is {@code true} + * then query result will not contain some key more than once even if several nodes + * returned entries with the same keys. Default value is {@code false}. + * + * @param dedup Query {@code enableDedup} flag. + * @return {@code this} query instance for chaining. + */ + public CacheQuery<T> enableDedup(boolean dedup); + + /** + * Sets optional grid projection to execute this query on. + * + * @param prj Projection. + * @return {@code this} query instance for chaining. + */ + public CacheQuery<T> projection(ClusterGroup prj); + + /** + * Executes the query and returns the query future. Caller may decide to iterate + * over the returned future directly in which case the iterator may block until + * the next value will become available, or wait for the whole query to finish + * by calling any of the {@code 'get(..)'} methods on the returned future. If + * {@link #keepAll(boolean)} flag is set to {@code false}, then {@code 'get(..)'} + * methods will only return the last page received, otherwise all pages will be + * accumulated and returned to user as a collection. + * <p> + * Note that if the passed in grid projection is a local node, then query + * will be executed locally without distribution to other nodes. + * <p> + * Also note that query state cannot be changed (clause, timeout etc.), except + * arguments, if this method was called at least once. + * + * @param args Optional arguments. + * @return Future for the query result. + */ + public CacheQueryFuture<T> execute(@Nullable Object... args); + + /** + * Executes the query the same way as {@link #execute(Object...)} method but reduces result remotely. + * + * @param rmtReducer Remote reducer. + * @param args Optional arguments. + * @return Future for the query result. + */ + public <R> CacheQueryFuture<R> execute(IgniteReducer<T, R> rmtReducer, @Nullable Object... args); + + /** + * Executes the query the same way as {@link #execute(Object...)} method but transforms result remotely. + * + * @param rmtTransform Remote transformer. + * @param args Optional arguments. + * @return Future for the query result. + */ + public <R> CacheQueryFuture<R> execute(IgniteClosure<T, R> rmtTransform, @Nullable Object... args); + + /** + * Gets metrics for this query. + * + * @return Query metrics. + */ + public CacheQueryMetrics metrics(); + + /** + * Resets metrics for this query. + */ + public void resetMetrics(); +}