# ignite-6
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/349d51ed Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/349d51ed Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/349d51ed Branch: refs/heads/ignite-6 Commit: 349d51ed33c6e5ff8b782bf637956a46ee062e5a Parents: 7f515d1 Author: sboikov <sboi...@gridgain.com> Authored: Tue Jan 27 10:05:57 2015 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Tue Jan 27 16:32:43 2015 +0300 ---------------------------------------------------------------------- .../src/main/java/org/apache/ignite/Ignite.java | 38 + .../org/apache/ignite/IgniteAtomicLong.java | 14 +- .../apache/ignite/IgniteAtomicReference.java | 14 +- .../org/apache/ignite/IgniteAtomicSequence.java | 7 +- .../org/apache/ignite/IgniteAtomicStamped.java | 14 +- .../org/apache/ignite/IgniteCountDownLatch.java | 12 +- .../java/org/apache/ignite/IgniteQueue.java | 16 +- .../main/java/org/apache/ignite/IgniteSet.java | 13 +- .../IgniteAtomicConfiguration.java | 14 +- .../IgniteCollectionConfiguration.java | 149 +++ .../org/apache/ignite/internal/GridKernal.java | 32 + .../CacheDataStructuresManager.java | 176 ++++ .../CacheDataStructuresConfigurationKey.java | 62 ++ .../CacheDataStructuresProcessor.java | 968 ++++++++++--------- .../datastructures/GridCacheAtomicLongImpl.java | 62 +- .../GridCacheAtomicReferenceImpl.java | 26 +- .../GridCacheAtomicStampedImpl.java | 26 +- .../GridCacheCountDownLatchImpl.java | 37 +- .../datastructures/GridCacheQueueAdapter.java | 42 +- .../datastructures/GridCacheQueueProxy.java | 27 +- .../datastructures/GridCacheSetImpl.java | 16 + .../datastructures/GridCacheSetProxy.java | 34 +- ...eAbstractDataStructuresFailoverSelfTest.java | 351 +++---- ...actQueueFailoverDataConsistencySelfTest.java | 76 +- .../GridCacheAtomicLongApiAbstractSelfTest.java | 266 +++++ .../GridCacheAtomicLongApiSelfTest.java | 483 --------- ...CacheAtomicReferenceApiSelfAbstractTest.java | 134 +-- ...idCacheAtomicStampedApiSelfAbstractTest.java | 53 +- ...GridCacheCountDownLatchAbstractSelfTest.java | 270 ++++++ .../GridCacheCountDownLatchSelfTest.java | 408 -------- .../GridCacheQueueApiSelfAbstractTest.java | 249 +---- .../IgniteAtomicsAbstractTest.java | 2 +- .../IgniteCollectionAbstractTest.java | 119 +++ .../GridCacheLocalAtomicLongApiSelfTest.java | 33 + .../GridCacheLocalAtomicQueueApiSelfTest.java | 11 +- .../GridCacheLocalCountDownLatchSelfTest.java | 95 ++ .../local/GridCacheLocalQueueApiSelfTest.java | 19 +- ...idCachePartitionedAtomicLongApiSelfTest.java | 33 + ...dCachePartitionedAtomicQueueApiSelfTest.java | 13 +- ...micQueueFailoverDataConsistencySelfTest.java | 2 +- ...hePartitionedAtomicReferenceApiSelfTest.java | 19 +- ...titionedAtomicSequenceMultiThreadedTest.java | 72 +- ...achePartitionedAtomicStampedApiSelfTest.java | 23 +- ...dCachePartitionedCountDownLatchSelfTest.java | 33 + ...rtitionedDataStructuresFailoverSelfTest.java | 2 +- ...idCachePartitionedNodeRestartTxSelfTest.java | 15 +- .../GridCachePartitionedQueueApiSelfTest.java | 25 +- ...nedQueueFailoverDataConsistencySelfTest.java | 7 + ...ridCacheReplicatedAtomicLongApiSelfTest.java | 33 + ...cheReplicatedAtomicReferenceApiSelfTest.java | 16 +- ...CacheReplicatedAtomicStampedApiSelfTest.java | 15 +- ...idCacheReplicatedCountDownLatchSelfTest.java | 33 + ...eplicatedDataStructuresFailoverSelfTest.java | 2 +- .../GridCacheReplicatedQueueApiSelfTest.java | 22 +- .../testframework/junits/GridTestIgnite.java | 19 + .../IgniteCacheDataStructuresSelfTestSuite.java | 8 +- .../org/apache/ignite/IgniteSpringBean.java | 24 + 57 files changed, 2511 insertions(+), 2273 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/349d51ed/modules/core/src/main/java/org/apache/ignite/Ignite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/Ignite.java b/modules/core/src/main/java/org/apache/ignite/Ignite.java index 26169f5..46ad532 100644 --- a/modules/core/src/main/java/org/apache/ignite/Ignite.java +++ b/modules/core/src/main/java/org/apache/ignite/Ignite.java @@ -372,6 +372,44 @@ public interface Ignite extends AutoCloseable { throws IgniteCheckedException; /** + * Will get a named queue from cache and create one if it has not been created yet and {@code create} flag + * is {@code true}. + * If queue is present in cache already, queue properties will not be changed. Use + * collocation for {@link CacheMode#PARTITIONED} caches if you have lots of relatively + * small queues as it will make fetching, querying, and iteration a lot faster. If you have + * few very large queues, then you should consider turning off collocation as they simply + * may not fit in a single node's memory. However note that in this case + * to get a single element off the queue all nodes may have to be queried. + * + * @param name Name of queue. + * @param cfg Queue configuration. + * @param cap Capacity of queue, {@code 0} for unbounded queue. + * @param create Boolean flag indicating whether data structure should be created if does not exist. + * @return Queue with given properties. + * @throws IgniteCheckedException If remove failed. + */ + @Nullable public <T> IgniteQueue<T> queue(String name, + IgniteCollectionConfiguration cfg, + int cap, + boolean create) + throws IgniteCheckedException; + + /** + * Will get a named set from cache and create one if it has not been created yet and {@code create} flag + * is {@code true}. + * + * @param name Set name. + * @param cfg Set configuration. + * @param create Flag indicating whether set should be created if does not exist. + * @return Set with given properties. + * @throws IgniteCheckedException If failed. + */ + @Nullable public <T> IgniteSet<T> set(String name, + IgniteCollectionConfiguration cfg, + boolean create) + throws IgniteCheckedException; + + /** * Gets an instance of deployed Ignite plugin. * * @param name Plugin name. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/349d51ed/modules/core/src/main/java/org/apache/ignite/IgniteAtomicLong.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteAtomicLong.java b/modules/core/src/main/java/org/apache/ignite/IgniteAtomicLong.java index cefef41..5046207 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteAtomicLong.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteAtomicLong.java @@ -19,6 +19,8 @@ package org.apache.ignite; import org.apache.ignite.*; +import java.io.*; + /** * This interface provides a rich API for working with distributedly cached atomic long value. * <p> @@ -57,12 +59,11 @@ import org.apache.ignite.*; * <h1 class="header">Creating Distributed Atomic Long</h1> * Instance of distributed atomic long can be created by calling the following method: * <ul> - * <li>{@link org.apache.ignite.cache.datastructures.CacheDataStructures#atomicLong(String, long, boolean)}</li> + * <li>{@link Ignite#atomicLong(String, long, boolean)}</li> * </ul> - * @see org.apache.ignite.cache.datastructures.CacheDataStructures#atomicLong(String, long, boolean) - * @see org.apache.ignite.cache.datastructures.CacheDataStructures#removeAtomicLong(String) + * @see Ignite#atomicLong(String, long, boolean) */ -public interface IgniteAtomicLong { +public interface IgniteAtomicLong extends Closeable { /** * Name of atomic long. * @@ -154,4 +155,9 @@ public interface IgniteAtomicLong { * @return {@code true} if atomic was removed from cache, {@code false} in other case. */ public boolean removed(); + + /** + * Removes this atomic long. + */ + @Override public void close(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/349d51ed/modules/core/src/main/java/org/apache/ignite/IgniteAtomicReference.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteAtomicReference.java b/modules/core/src/main/java/org/apache/ignite/IgniteAtomicReference.java index 3aaa219..b247504 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteAtomicReference.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteAtomicReference.java @@ -19,6 +19,8 @@ package org.apache.ignite; import org.apache.ignite.*; +import java.io.*; + /** * This interface provides a rich API for working with distributed atomic reference. * <p> @@ -44,12 +46,11 @@ import org.apache.ignite.*; * <h1 class="header">Creating Distributed Atomic Reference</h1> * Instance of distributed atomic reference can be created by calling the following method: * <ul> - * <li>{@link org.apache.ignite.cache.datastructures.CacheDataStructures#atomicReference(String, Object, boolean)}</li> + * <li>{@link Ignite#atomicReference(String, Object, boolean)}</li> * </ul> - * @see org.apache.ignite.cache.datastructures.CacheDataStructures#atomicReference(String, Object, boolean) - * @see org.apache.ignite.cache.datastructures.CacheDataStructures#removeAtomicReference(String) + * @see Ignite#atomicReference(String, Object, boolean) */ -public interface IgniteAtomicReference<T> { +public interface IgniteAtomicReference<T> extends Closeable { /** * Name of atomic reference. * @@ -90,4 +91,9 @@ public interface IgniteAtomicReference<T> { * @return {@code true} if an atomic reference was removed from cache, {@code false} otherwise. */ public boolean removed(); + + /** + * Removes this atomic reference. + */ + @Override public void close(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/349d51ed/modules/core/src/main/java/org/apache/ignite/IgniteAtomicSequence.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteAtomicSequence.java b/modules/core/src/main/java/org/apache/ignite/IgniteAtomicSequence.java index 864059d..62b930a 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteAtomicSequence.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteAtomicSequence.java @@ -56,8 +56,7 @@ import java.io.*; * <ul> * <li>{@link Ignite#atomicSequence(String, long, boolean)}</li> * </ul> - * @see org.apache.ignite.cache.datastructures.CacheDataStructures#atomicSequence(String, long, boolean) - * @see org.apache.ignite.cache.datastructures.CacheDataStructures#removeAtomicSequence(String) + * @see Ignite#atomicSequence(String, long, boolean) */ public interface IgniteAtomicSequence extends Closeable { /** @@ -131,7 +130,7 @@ public interface IgniteAtomicSequence extends Closeable { public boolean removed(); /** - * Removes atomic sequence. + * Removes this atomic sequence. */ - @Override void close(); + @Override public void close(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/349d51ed/modules/core/src/main/java/org/apache/ignite/IgniteAtomicStamped.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteAtomicStamped.java b/modules/core/src/main/java/org/apache/ignite/IgniteAtomicStamped.java index dfa3953..6f9b0b3 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteAtomicStamped.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteAtomicStamped.java @@ -20,6 +20,8 @@ package org.apache.ignite; import org.apache.ignite.*; import org.apache.ignite.lang.*; +import java.io.*; + /** * This interface provides a rich API for working with distributed atomic stamped value. * <p> @@ -53,12 +55,11 @@ import org.apache.ignite.lang.*; * <h1 class="header">Creating Distributed Atomic Stamped</h1> * Instance of distributed atomic stamped can be created by calling the following method: * <ul> - * <li>{@link org.apache.ignite.cache.datastructures.CacheDataStructures#atomicLong(String, long, boolean)}</li> + * <li>{@link Ignite#atomicLong(String, long, boolean)}</li> * </ul> - * @see org.apache.ignite.cache.datastructures.CacheDataStructures#atomicStamped(String, Object, Object, boolean) - * @see org.apache.ignite.cache.datastructures.CacheDataStructures#removeAtomicStamped(String) + * @see Ignite#atomicStamped(String, Object, Object, boolean) */ -public interface IgniteAtomicStamped<T, S> { +public interface IgniteAtomicStamped<T, S> extends Closeable { /** * Name of atomic stamped. * @@ -118,4 +119,9 @@ public interface IgniteAtomicStamped<T, S> { * @return {@code true} if atomic stamped was removed from cache, {@code false} otherwise. */ public boolean removed(); + + /** + * Removes this atomic stamped. + */ + @Override public void close(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/349d51ed/modules/core/src/main/java/org/apache/ignite/IgniteCountDownLatch.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteCountDownLatch.java b/modules/core/src/main/java/org/apache/ignite/IgniteCountDownLatch.java index 43ffe08..c09dfd2 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteCountDownLatch.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteCountDownLatch.java @@ -19,6 +19,7 @@ package org.apache.ignite; import org.apache.ignite.*; +import java.io.*; import java.util.concurrent.*; /** @@ -30,11 +31,9 @@ import java.util.concurrent.*; * counted down to zero first. * <h1 class="header">Creating Distributed Count Down Latch</h1> * Instance of cache count down latch can be created by calling the following method: - * {@link org.apache.ignite.cache.datastructures.CacheDataStructures#countDownLatch(String, int, boolean, boolean)}. - * @see org.apache.ignite.cache.datastructures.CacheDataStructures#countDownLatch(String, int, boolean, boolean) - * @see org.apache.ignite.cache.datastructures.CacheDataStructures#removeCountDownLatch(String) + * {@link Ignite#countDownLatch(String, int, boolean, boolean)}. */ -public interface IgniteCountDownLatch { +public interface IgniteCountDownLatch extends Closeable { /** * Gets name of the latch. * @@ -223,4 +222,9 @@ public interface IgniteCountDownLatch { * @return {@code True} if latch was removed from cache, {@code false} otherwise. */ public boolean removed(); + + /** + * Removes this count down latch. + */ + @Override public void close(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/349d51ed/modules/core/src/main/java/org/apache/ignite/IgniteQueue.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteQueue.java b/modules/core/src/main/java/org/apache/ignite/IgniteQueue.java index 31dafef..6d4b9c8 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteQueue.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteQueue.java @@ -17,9 +17,10 @@ package org.apache.ignite; -import org.apache.ignite.*; +import org.apache.ignite.configuration.*; import org.jetbrains.annotations.*; +import java.io.*; import java.util.*; import java.util.concurrent.*; @@ -66,13 +67,11 @@ import java.util.concurrent.*; * Instances of distributed cache queues can be created by calling the following method * on {@link org.apache.ignite.cache.datastructures.CacheDataStructures} API: * <ul> - * <li>{@link org.apache.ignite.cache.datastructures.CacheDataStructures#queue(String, int, boolean, boolean)}</li> + * <li>{@link Ignite#queue(String, IgniteCollectionConfiguration, int, boolean)}</li> * </ul> - * @see org.apache.ignite.cache.datastructures.CacheDataStructures#queue(String, int, boolean, boolean) - * @see org.apache.ignite.cache.datastructures.CacheDataStructures#removeQueue(String) - * @see org.apache.ignite.cache.datastructures.CacheDataStructures#removeQueue(String, int) + * @see Ignite#queue(String, IgniteCollectionConfiguration, int, boolean) */ -public interface IgniteQueue<T> extends BlockingQueue<T> { +public interface IgniteQueue<T> extends BlockingQueue<T>, Closeable { /** * Gets queue name. * @@ -179,4 +178,9 @@ public interface IgniteQueue<T> extends BlockingQueue<T> { * @return {@code true} if queue was removed from cache {@code false} otherwise. */ public boolean removed(); + + /** + * Removes this queue. + */ + @Override public void close(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/349d51ed/modules/core/src/main/java/org/apache/ignite/IgniteSet.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSet.java b/modules/core/src/main/java/org/apache/ignite/IgniteSet.java index 9dafd33..0584a16 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteSet.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteSet.java @@ -17,8 +17,9 @@ package org.apache.ignite; -import org.apache.ignite.*; +import org.apache.ignite.configuration.*; +import java.io.*; import java.util.*; /** @@ -32,10 +33,9 @@ import java.util.*; * (governed by {@code collocated} parameter). {@code Non-collocated} mode is provided only * for partitioned caches. If {@code collocated} parameter is {@code true}, then all set items * will be collocated on one node, otherwise items will be distributed through all grid nodes. - * @see org.apache.ignite.cache.datastructures.CacheDataStructures#set(String, boolean, boolean) - * @see org.apache.ignite.cache.datastructures.CacheDataStructures#removeSet(String) + * @see Ignite#set(String, IgniteCollectionConfiguration, boolean) */ -public interface IgniteSet<T> extends Set<T> { +public interface IgniteSet<T> extends Set<T>, Closeable { /** * Gets set name. * @@ -58,4 +58,9 @@ public interface IgniteSet<T> extends Set<T> { * @return {@code True} if set was removed from cache {@code false} otherwise. */ public boolean removed(); + + /** + * Removes this set. + */ + @Override public void close(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/349d51ed/modules/core/src/main/java/org/apache/ignite/configuration/IgniteAtomicConfiguration.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteAtomicConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteAtomicConfiguration.java index 5e616d7..893f8f8 100644 --- a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteAtomicConfiguration.java +++ b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteAtomicConfiguration.java @@ -41,21 +41,33 @@ public class IgniteAtomicConfiguration { /** Cache mode. */ private CacheMode cacheMode = DFLT_CACHE_MODE; - /** */ + /** Number of backups. */ private int backups = DFLT_BACKUPS; + /** + * @return + */ public int getBackups() { return backups; } + /** + * @param backups + */ public void setBackups(int backups) { this.backups = backups; } + /** + * @return + */ public CacheMode getCacheMode() { return cacheMode; } + /** + * @param cacheMode + */ public void setCacheMode(CacheMode cacheMode) { this.cacheMode = cacheMode; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/349d51ed/modules/core/src/main/java/org/apache/ignite/configuration/IgniteCollectionConfiguration.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteCollectionConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteCollectionConfiguration.java new file mode 100644 index 0000000..cfd3de3 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteCollectionConfiguration.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.configuration; + +import org.apache.ignite.cache.*; + +import static org.apache.ignite.cache.CacheAtomicityMode.*; +import static org.apache.ignite.cache.CacheDistributionMode.*; +import static org.apache.ignite.cache.CacheMemoryMode.*; +import static org.apache.ignite.cache.CacheMode.*; + +/** + * + */ +public class IgniteCollectionConfiguration { + /** Default backups number. */ + public static final int DFLT_BACKUPS = 0; + + /** Default cache mode. */ + public static final CacheMode DFLT_CACHE_MODE = PARTITIONED; + + /** Default atomicity mode. */ + public static final CacheAtomicityMode DFLT_ATOMICITY_MODE = ATOMIC; + + /** Default memory mode. */ + public static final CacheMemoryMode DFLT_MEMORY_MODE = ONHEAP_TIERED; + + /** Default distribution mode. */ + public static final CacheDistributionMode DFLT_DISTRIBUTION_MODE = PARTITIONED_ONLY; + + /** Cache mode. */ + private CacheMode cacheMode = DFLT_CACHE_MODE; + + /** Cache distribution mode. */ + private CacheDistributionMode distro = DFLT_DISTRIBUTION_MODE; + + /** Number of backups. */ + private int backups = DFLT_BACKUPS; + + /** Atomicity mode. */ + private CacheAtomicityMode atomicityMode = DFLT_ATOMICITY_MODE; + + /** Memory mode. */ + private CacheMemoryMode memMode = DFLT_MEMORY_MODE; + + /** */ + private boolean collocated; + + /** + * @return {@code True} if all items within the same collection will be collocated on the same node. + */ + public boolean isCollocated() { + return collocated; + } + + /** + * @param collocated If {@code true} then all items within the same collection will be collocated on the same node. + * Otherwise elements of the same set maybe be cached on different nodes. This parameter works only + * collections stored in {@link CacheMode#PARTITIONED} cache. + */ + public void setCollocated(boolean collocated) { + this.collocated = collocated; + } + + /** + * @return Number of cache backups. + */ + public int getBackups() { + return backups; + } + + /** + * @param backups Number of cache backups. + */ + public void setBackups(int backups) { + this.backups = backups; + } + + /** + * @return Cache mode. + */ + public CacheMode getCacheMode() { + return cacheMode; + } + + /** + * @param cacheMode Cache mode. + */ + public void setCacheMode(CacheMode cacheMode) { + this.cacheMode = cacheMode; + } + + /** + * @return Cache atomicity mode. + */ + public CacheAtomicityMode getAtomicityMode() { + return atomicityMode; + } + + /** + * @param atomicityMode Cache atomicity mode. + */ + public void setAtomicityMode(CacheAtomicityMode atomicityMode) { + this.atomicityMode = atomicityMode; + } + + /** + * @return Cache memory mode. + */ + public CacheMemoryMode getMemoryMode() { + return memMode; + } + + /** + * @param memMode Cache memory mode. + */ + public void setMemoryMode(CacheMemoryMode memMode) { + this.memMode = memMode; + } + + /** + * @return Cache distribution mode. + */ + public CacheDistributionMode getDistributionMode() { + return distro; + } + + /** + * @param distro Cache distribution mode. + */ + public void setDistributionMode(CacheDistributionMode distro) { + this.distro = distro; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/349d51ed/modules/core/src/main/java/org/apache/ignite/internal/GridKernal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernal.java index 3285bd9..c924890 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernal.java @@ -3298,6 +3298,38 @@ public class GridKernal extends ClusterGroupAdapter implements GridEx, IgniteMBe } } + /** {@inheritDoc} */ + @Nullable @Override public <T> IgniteQueue<T> queue(String name, + IgniteCollectionConfiguration cfg, + int cap, + boolean create) throws IgniteCheckedException + { + guard(); + + try { + return ctx.dataStructures().queue(name, cfg, cap, create); + } + finally { + unguard(); + } + } + + /** {@inheritDoc} */ + @Nullable @Override public <T> IgniteSet<T> set(String name, + IgniteCollectionConfiguration cfg, + boolean create) + throws IgniteCheckedException + { + guard(); + + try { + return ctx.dataStructures().set(name, cfg, create); + } + finally { + unguard(); + } + } + /** * Creates optional component. * http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/349d51ed/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java index 9598bdf..f6cb489 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java @@ -17,15 +17,21 @@ package org.apache.ignite.internal.processors.cache.datastructures; +import org.apache.ignite.*; +import org.apache.ignite.cache.*; import org.apache.ignite.internal.processors.cache.*; +import org.apache.ignite.internal.processors.cache.query.continuous.*; import org.apache.ignite.internal.processors.datastructures.*; import org.apache.ignite.internal.util.*; +import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.lang.*; import org.jdk8.backport.*; import org.jetbrains.annotations.*; +import java.io.*; import java.util.*; import java.util.concurrent.*; +import java.util.concurrent.atomic.*; /** * @@ -35,6 +41,146 @@ public class CacheDataStructuresManager<K, V> extends GridCacheManagerAdapter<K, private ConcurrentMap<IgniteUuid, GridConcurrentHashSet<GridCacheSetItemKey>> setDataMap = new ConcurrentHashMap8<>(); + /** Queue header view. */ + private CacheProjection<GridCacheQueueHeaderKey, GridCacheQueueHeader> queueHdrView; + + /** Query notifying about queue update. */ + private GridCacheContinuousQueryAdapter queueQry; + + /** Queue query creation guard. */ + private final AtomicBoolean queueQryGuard = new AtomicBoolean(); + + /** Busy lock. */ + private final GridSpinBusyLock busyLock = new GridSpinBusyLock(); + + /** Init latch. */ + private final CountDownLatch initLatch = new CountDownLatch(1); + + /** Init flag. */ + private boolean initFlag; + + /** {@inheritDoc} */ + @Override protected void onKernalStart0() throws IgniteCheckedException { + try { + queueHdrView = cctx.cache().projection(GridCacheQueueHeaderKey.class, GridCacheQueueHeader.class); + + initFlag = true; + } + finally { + initLatch.countDown(); + } + } + + /** {@inheritDoc} */ + @Override protected void onKernalStop0(boolean cancel) { + busyLock.block(); + + if (queueQry != null) { + try { + queueQry.close(); + } + catch (IgniteCheckedException e) { + U.warn(log, "Failed to cancel queue header query.", e); + } + } + } + + /** + * @throws IgniteCheckedException If thread is interrupted or manager + * was not successfully initialized. + */ + private void waitInitialization() throws IgniteCheckedException { + if (initLatch.getCount() > 0) + U.await(initLatch); + + if (!initFlag) + throw new IgniteCheckedException("DataStructures processor was not properly initialized."); + } + + /** + * @param name Queue name. + * @param cap Capacity. + * @param colloc Collocated flag. + * @param create Create flag. + * @return Queue header. + * @throws IgniteCheckedException If failed. + */ + @SuppressWarnings("unchecked") + @Nullable public GridCacheQueueHeader queue(final String name, + final int cap, + boolean colloc, + final boolean create) + throws IgniteCheckedException + { + waitInitialization(); + + cctx.gate().enter(); + + try { + GridCacheQueueHeaderKey key = new GridCacheQueueHeaderKey(name); + + GridCacheQueueHeader hdr; + + if (create) { + hdr = new GridCacheQueueHeader(IgniteUuid.randomUuid(), cap, colloc, 0, 0, null); + + GridCacheQueueHeader old = queueHdrView.putIfAbsent(key, hdr); + + if (old != null) { + if (old.capacity() != cap || old.collocated() != colloc) + throw new IgniteCheckedException("Failed to create queue, queue with the same name but different " + + "configuration already exists [name=" + name + ']'); + + hdr = old; + } + } + else + hdr = queueHdrView.get(key); + + if (hdr == null) + return null; + + if (queueQryGuard.compareAndSet(false, true)) { + queueQry = (GridCacheContinuousQueryAdapter)cctx.cache().queries().createContinuousQuery(); + + queueQry.filter(new QueueHeaderPredicate()); + + queueQry.localCallback(new IgniteBiPredicate<UUID, Collection<GridCacheContinuousQueryEntry>>() { + @Override public boolean apply(UUID id, Collection<GridCacheContinuousQueryEntry> entries) { + if (!busyLock.enterBusy()) + return false; + + try { + for (GridCacheContinuousQueryEntry e : entries) { + GridCacheQueueHeaderKey key = (GridCacheQueueHeaderKey)e.getKey(); + GridCacheQueueHeader hdr = (GridCacheQueueHeader)e.getValue(); + GridCacheQueueHeader oldHdr = (GridCacheQueueHeader)e.getOldValue(); + + cctx.kernalContext().dataStructures().onQueueUpdated(key, hdr, oldHdr); + } + + return true; + } + finally { + busyLock.leaveBusy(); + } + } + }); + + queueQry.execute(cctx.isLocal() || cctx.isReplicated() ? cctx.grid().forLocal() : null, + true, + false, + false, + true); + } + + return hdr; + } + finally { + cctx.gate().leave(); + } + } + /** * Entry update callback. * @@ -97,4 +243,34 @@ public class CacheDataStructuresManager<K, V> extends GridCacheManagerAdapter<K, else set.add(key); } + + /** + * Predicate for queue continuous query. + */ + private static class QueueHeaderPredicate implements IgniteBiPredicate, Externalizable { + /** */ + private static final long serialVersionUID = 0L; + + /** + * Required by {@link Externalizable}. + */ + public QueueHeaderPredicate() { + // No-op. + } + + /** {@inheritDoc} */ + @Override public boolean apply(Object key, Object val) { + return key instanceof GridCacheQueueHeaderKey; + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) { + // No-op. + } + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/349d51ed/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/CacheDataStructuresConfigurationKey.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/CacheDataStructuresConfigurationKey.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/CacheDataStructuresConfigurationKey.java new file mode 100644 index 0000000..a2bec2b --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/CacheDataStructuresConfigurationKey.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.datastructures; + +import org.apache.ignite.internal.processors.cache.*; + +import java.io.*; + +/** + * Key used to store in utility cache information about all created data structures. + */ +public class CacheDataStructuresConfigurationKey implements GridCacheInternal, Externalizable { + /** */ + private static final long serialVersionUID = 0L; + + /** + * + */ + public CacheDataStructuresConfigurationKey() { + // No-op. + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return 31; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object obj) { + return obj == this || (obj instanceof CacheDataStructuresConfigurationKey); + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + // No-op. + } + + /** {@inheritDoc} */ + @Override public String toString() { + return "CacheDataStructuresConfigurationKey []"; + } +}