# ignite-6
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/9e893982 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/9e893982 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/9e893982 Branch: refs/heads/ignite-6 Commit: 9e8939828649b522c24b08970968037be4bb7bb4 Parents: 368dd63 Author: sboikov <semen.boi...@inria.fr> Authored: Tue Feb 3 23:17:32 2015 +0300 Committer: sboikov <semen.boi...@inria.fr> Committed: Wed Feb 4 00:41:55 2015 +0300 ---------------------------------------------------------------------- .../datastructures/CacheQueueExample.java | 60 +- .../datastructures/CacheSetExample.java | 38 +- .../src/main/java/org/apache/ignite/Ignite.java | 26 +- .../java/org/apache/ignite/cache/GridCache.java | 1 - .../processors/cache/GridCacheProxyImpl.java | 1 - .../CacheDataStructuresManager.java | 50 +- .../CacheDataStructuresProcessor.java | 1293 ++++++++++++------ .../datastructures/GridCacheQueueAdapter.java | 20 +- .../datastructures/GridCacheSetImpl.java | 2 +- ...ridCacheQueueJoinedNodeSelfAbstractTest.java | 2 +- .../IgniteDataStructureUniqueNameTest.java | 164 ++- 11 files changed, 1124 insertions(+), 533 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9e893982/examples/src/main/java/org/apache/ignite/examples/datagrid/datastructures/CacheQueueExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/datagrid/datastructures/CacheQueueExample.java b/examples/src/main/java/org/apache/ignite/examples/datagrid/datastructures/CacheQueueExample.java index f618c24..4ae9395 100644 --- a/examples/src/main/java/org/apache/ignite/examples/datagrid/datastructures/CacheQueueExample.java +++ b/examples/src/main/java/org/apache/ignite/examples/datagrid/datastructures/CacheQueueExample.java @@ -35,6 +35,9 @@ import java.util.*; * start GridGain node with {@code examples/config/example-cache.xml} configuration. */ public class CacheQueueExample { + /** Cache name. */ + private static final String CACHE_NAME = "partitioned_tx"; + /** Number of retries */ private static final int RETRIES = 20; @@ -45,9 +48,9 @@ public class CacheQueueExample { * Executes example. * * @param args Command line arguments, none required. - * @throws IgniteCheckedException If example execution failed. + * @throws Exception If example execution failed. */ - public static void main(String[] args) throws IgniteCheckedException { + public static void main(String[] args) throws Exception { try (Ignite g = Ignition.start("examples/config/example-cache.xml")) { System.out.println(); System.out.println(">>> Cache queue example started."); @@ -73,11 +76,13 @@ public class CacheQueueExample { * @param g Grid. * @param queueName Name of queue. * @return Queue. - * @throws IgniteCheckedException If execution failed. + * @throws IgniteException If execution failed. */ - private static IgniteQueue<String> initializeQueue(Ignite g, String queueName) throws IgniteCheckedException { + private static IgniteQueue<String> initializeQueue(Ignite g, String queueName) throws IgniteException { IgniteCollectionConfiguration colCfg = new IgniteCollectionConfiguration(); + colCfg.setCacheName(CACHE_NAME); + // Initialize new FIFO queue. IgniteQueue<String> queue = g.queue(queueName, colCfg, 0, true); @@ -95,9 +100,9 @@ public class CacheQueueExample { * Read items from head and tail of queue. * * @param g Grid. - * @throws IgniteCheckedException If failed. + * @throws IgniteException If failed. */ - private static void readFromQueue(Ignite g) throws IgniteCheckedException { + private static void readFromQueue(Ignite g) throws IgniteException { final String queueName = queue.name(); // Read queue items on each node. @@ -110,9 +115,9 @@ public class CacheQueueExample { * Write items into queue. * * @param g Grid. - * @throws IgniteCheckedException If failed. + * @throws IgniteException If failed. */ - private static void writeToQueue(Ignite g) throws IgniteCheckedException { + private static void writeToQueue(Ignite g) throws IgniteException { final String queueName = queue.name(); // Write queue items on each node. @@ -132,9 +137,9 @@ public class CacheQueueExample { * Clear and remove queue. * * @param g Grid. - * @throws IgniteCheckedException If execution failed. + * @throws IgniteException If execution failed. */ - private static void clearAndRemoveQueue(Ignite g) throws IgniteCheckedException { + private static void clearAndRemoveQueue(Ignite g) throws IgniteException { System.out.println("Queue size before clearing: " + queue.size()); // Clear queue. @@ -175,32 +180,27 @@ public class CacheQueueExample { /** {@inheritDoc} */ @Override public void run() { - try { - IgniteQueue<String> queue = Ignition.ignite().queue(queueName, null, 0, false); + IgniteQueue<String> queue = Ignition.ignite().queue(queueName, null, 0, false); - if (put) { - UUID locId = Ignition.ignite().cluster().localNode().id(); + if (put) { + UUID locId = Ignition.ignite().cluster().localNode().id(); - for (int i = 0; i < RETRIES; i++) { - String item = locId + "_" + Integer.toString(i); + for (int i = 0; i < RETRIES; i++) { + String item = locId + "_" + Integer.toString(i); - queue.put(item); + queue.put(item); - System.out.println("Queue item has been added: " + item); - } - } - else { - // Take items from queue head. - for (int i = 0; i < RETRIES; i++) - System.out.println("Queue item has been read from queue head: " + queue.take()); - - // Take items from queue head once again. - for (int i = 0; i < RETRIES; i++) - System.out.println("Queue item has been read from queue head: " + queue.poll()); + System.out.println("Queue item has been added: " + item); } } - catch (IgniteCheckedException e) { - throw new RuntimeException(e); + else { + // Take items from queue head. + for (int i = 0; i < RETRIES; i++) + System.out.println("Queue item has been read from queue head: " + queue.take()); + + // Take items from queue head once again. + for (int i = 0; i < RETRIES; i++) + System.out.println("Queue item has been read from queue head: " + queue.poll()); } } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9e893982/examples/src/main/java/org/apache/ignite/examples/datagrid/datastructures/CacheSetExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/datagrid/datastructures/CacheSetExample.java b/examples/src/main/java/org/apache/ignite/examples/datagrid/datastructures/CacheSetExample.java index adc1b44..a961f30 100644 --- a/examples/src/main/java/org/apache/ignite/examples/datagrid/datastructures/CacheSetExample.java +++ b/examples/src/main/java/org/apache/ignite/examples/datagrid/datastructures/CacheSetExample.java @@ -34,6 +34,9 @@ import java.util.*; * start GridGain node with {@code examples/config/example-cache.xml} configuration. */ public class CacheSetExample { + /** Cache name. */ + private static final String CACHE_NAME = "partitioned_tx"; + /** Set instance. */ private static IgniteSet<String> set; @@ -41,9 +44,9 @@ public class CacheSetExample { * Executes example. * * @param args Command line arguments, none required. - * @throws IgniteCheckedException If example execution failed. + * @throws Exception If example execution failed. */ - public static void main(String[] args) throws IgniteCheckedException { + public static void main(String[] args) throws Exception { try (Ignite g = Ignition.start("examples/config/example-cache.xml")) { System.out.println(); System.out.println(">>> Cache set example started."); @@ -67,11 +70,13 @@ public class CacheSetExample { * @param g Grid. * @param setName Name of set. * @return Set. - * @throws IgniteCheckedException If execution failed. + * @throws IgniteException If execution failed. */ - private static IgniteSet<String> initializeSet(Ignite g, String setName) throws IgniteCheckedException { + private static IgniteSet<String> initializeSet(Ignite g, String setName) throws IgniteException { IgniteCollectionConfiguration setCfg = new IgniteCollectionConfiguration(); + setCfg.setCacheName(CACHE_NAME); + // Initialize new set. IgniteSet<String> set = g.set(setName, setCfg, true); @@ -88,9 +93,9 @@ public class CacheSetExample { * Write items into set. * * @param g Grid. - * @throws IgniteCheckedException If failed. + * @throws IgniteException If failed. */ - private static void writeToSet(Ignite g) throws IgniteCheckedException { + private static void writeToSet(Ignite g) throws IgniteException { final String setName = set.name(); // Write set items on each node. @@ -126,9 +131,9 @@ public class CacheSetExample { * Clear and remove set. * * @param g Grid. - * @throws IgniteCheckedException If execution failed. + * @throws IgniteException If execution failed. */ - private static void clearAndRemoveSet(Ignite g) throws IgniteCheckedException { + private static void clearAndRemoveSet(Ignite g) throws IgniteException { System.out.println("Set size before clearing: " + set.size()); // Clear set. @@ -166,21 +171,16 @@ public class CacheSetExample { /** {@inheritDoc} */ @Override public void run() { - try { - IgniteSet<String> set = Ignition.ignite().set(setName, null, false); + IgniteSet<String> set = Ignition.ignite().set(setName, null, false); - UUID locId = Ignition.ignite().cluster().localNode().id(); + UUID locId = Ignition.ignite().cluster().localNode().id(); - for (int i = 0; i < 5; i++) { - String item = locId + "_" + Integer.toString(i); + for (int i = 0; i < 5; i++) { + String item = locId + "_" + Integer.toString(i); - set.add(item); + set.add(item); - System.out.println("Set item has been added: " + item); - } - } - catch (IgniteCheckedException e) { - throw new RuntimeException(e); + System.out.println("Set item has been added: " + item); } } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9e893982/modules/core/src/main/java/org/apache/ignite/Ignite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/Ignite.java b/modules/core/src/main/java/org/apache/ignite/Ignite.java index 927ff67..302fce2 100644 --- a/modules/core/src/main/java/org/apache/ignite/Ignite.java +++ b/modules/core/src/main/java/org/apache/ignite/Ignite.java @@ -305,7 +305,7 @@ public interface Ignite extends AutoCloseable { * is {@code true}. * * @param name Sequence name. - * @param initVal Initial value for sequence. If sequence already cached, {@code initVal} will be ignored. + * @param initVal Initial value for sequence. Ignored if {@code create} flag is {@code false}. * @param create Boolean flag indicating whether data structure should be created if does not exist. * @return Sequence for the given name. * @throws IgniteException If sequence could not be fetched or created. @@ -318,8 +318,7 @@ public interface Ignite extends AutoCloseable { * is {@code true}. * * @param name Name of atomic long. - * @param initVal Initial value for atomic long. If atomic long already cached, {@code initVal} - * will be ignored. + * @param initVal Initial value for atomic long. Ignored if {@code create} flag is {@code false}. * @param create Boolean flag indicating whether data structure should be created if does not exist. * @return Atomic long. * @throws IgniteException If atomic long could not be fetched or created. @@ -332,8 +331,7 @@ public interface Ignite extends AutoCloseable { * is {@code true}. * * @param name Atomic reference name. - * @param initVal Initial value for atomic reference. If atomic reference already cached, - * {@code initVal} will be ignored. + * @param initVal Initial value for atomic reference. Ignored if {@code create} flag is {@code false}. * @param create Boolean flag indicating whether data structure should be created if does not exist. * @return Atomic reference for the given name. * @throws IgniteException If atomic reference could not be fetched or created. @@ -346,10 +344,8 @@ public interface Ignite extends AutoCloseable { * is {@code true}. * * @param name Atomic stamped name. - * @param initVal Initial value for atomic stamped. If atomic stamped already cached, - * {@code initVal} will be ignored. - * @param initStamp Initial stamp for atomic stamped. If atomic stamped already cached, - * {@code initStamp} will be ignored. + * @param initVal Initial value for atomic stamped. Ignored if {@code create} flag is {@code false}. + * @param initStamp Initial stamp for atomic stamped. Ignored if {@code create} flag is {@code false}. * @param create Boolean flag indicating whether data structure should be created if does not exist. * @return Atomic stamped for the given name. * @throws IgniteException If atomic stamped could not be fetched or created. @@ -362,9 +358,9 @@ public interface Ignite extends AutoCloseable { * is {@code true}, it is created using provided name and count parameter. * * @param name Name of the latch. - * @param cnt Count for new latch creation. - * @param autoDel {@code True} to automatically delete latch from cache - * when its count reaches zero. + * @param cnt Count for new latch creation. Ignored if {@code create} flag is {@code false}. + * @param autoDel {@code True} to automatically delete latch from cache when its count reaches zero. + * Ignored if {@code create} flag is {@code false}. * @param create Boolean flag indicating whether data structure should be created if does not exist. * @return Count down latch for the given name. * @throws IgniteException If latch could not be fetched or created. @@ -383,8 +379,8 @@ public interface Ignite extends AutoCloseable { * to get a single element off the queue all nodes may have to be queried. * * @param name Name of queue. - * @param cfg Queue configuration. - * @param cap Capacity of queue, {@code 0} for unbounded queue. + * @param cfg Queue configuration. Ignored if {@code create} flag is {@code false}. + * @param cap Capacity of queue, {@code 0} for unbounded queue. Ignored if {@code create} flag is {@code false}. * @param create Boolean flag indicating whether data structure should be created if does not exist. * @return Queue with given properties. * @throws IgniteException If queue could not be fetched or created. @@ -400,7 +396,7 @@ public interface Ignite extends AutoCloseable { * is {@code true}. * * @param name Set name. - * @param cfg Set configuration. + * @param cfg Set configuration. Ignored if {@code create} flag is {@code false}. * @param create Flag indicating whether set should be created if does not exist. * @return Set with given properties. * @throws IgniteException If set could not be fetched or created. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9e893982/modules/core/src/main/java/org/apache/ignite/cache/GridCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/GridCache.java b/modules/core/src/main/java/org/apache/ignite/cache/GridCache.java index 910c9d4..6eafb42 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/GridCache.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/GridCache.java @@ -19,7 +19,6 @@ package org.apache.ignite.cache; import org.apache.ignite.*; import org.apache.ignite.cache.affinity.*; -import org.apache.ignite.cache.datastructures.*; import org.apache.ignite.cache.store.*; import org.apache.ignite.internal.*; import org.apache.ignite.lang.*; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9e893982/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java index 52c6c32..ad9c6c2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java @@ -20,7 +20,6 @@ package org.apache.ignite.internal.processors.cache; import org.apache.ignite.*; import org.apache.ignite.cache.*; import org.apache.ignite.cache.affinity.*; -import org.apache.ignite.cache.datastructures.*; import org.apache.ignite.cache.query.*; import org.apache.ignite.cluster.*; import org.apache.ignite.internal.*; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9e893982/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java index 4133a0b..d47b40f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java @@ -21,6 +21,7 @@ import org.apache.ignite.*; import org.apache.ignite.cache.*; import org.apache.ignite.cluster.*; import org.apache.ignite.internal.*; +import org.apache.ignite.internal.cluster.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.query.continuous.*; import org.apache.ignite.internal.processors.datastructures.*; @@ -142,14 +143,7 @@ public class CacheDataStructuresManager<K, V> extends GridCacheManagerAdapter<K, // Non collocated mode enabled only for PARTITIONED cache. final boolean colloc0 = create && (cctx.cache().configuration().getCacheMode() != PARTITIONED || colloc); - if (cctx.atomic()) - return queue0(name, cap, colloc0, create); - - return CU.outTx(new Callable<GridCacheQueueProxy<T>>() { - @Nullable @Override public GridCacheQueueProxy<T> call() throws Exception { - return queue0(name, cap, colloc0, create); - } - }, cctx); + return queue0(name, cap, colloc0, create); } /** @@ -308,14 +302,7 @@ public class CacheDataStructuresManager<K, V> extends GridCacheManagerAdapter<K, final boolean colloc0 = create && (cctx.cache().configuration().getCacheMode() != PARTITIONED || colloc); - if (cctx.atomic()) - return set0(name, colloc0, create); - - return CU.outTx(new Callable<IgniteSet<T>>() { - @Nullable @Override public IgniteSet<T> call() throws Exception { - return set0(name, colloc0, create); - } - }, cctx); + return set0(name, colloc0, create); } /** @@ -427,20 +414,13 @@ public class CacheDataStructuresManager<K, V> extends GridCacheManagerAdapter<K, } /** - * @param name Set name. + * @param id Set ID. * @return {@code True} if set was removed. * @throws IgniteCheckedException If failed. */ @SuppressWarnings("unchecked") - public boolean removeSet(String name) throws IgniteCheckedException { - GridCacheSetHeaderKey key = new GridCacheSetHeaderKey(name); - - GridCache cache = cctx.cache(); - - GridCacheSetHeader hdr = retryRemove(cache, key); - - if (hdr == null) - return false; + public void removeSetData(IgniteUuid id) throws IgniteCheckedException { + assert id != null; if (!cctx.isLocal()) { while (true) { @@ -450,11 +430,11 @@ public class CacheDataStructuresManager<K, V> extends GridCacheManagerAdapter<K, try { cctx.closures().callAsyncNoFailover(BROADCAST, - new BlockSetCallable(cctx.name(), hdr.id()), + new BlockSetCallable(cctx.name(), id), nodes, true).get(); } - catch (ClusterTopologyException e) { + catch (ClusterTopologyCheckedException e) { if (log.isDebugEnabled()) log.debug("BlockSet job failed, will retry: " + e); @@ -463,11 +443,11 @@ public class CacheDataStructuresManager<K, V> extends GridCacheManagerAdapter<K, try { cctx.closures().callAsyncNoFailover(BROADCAST, - new RemoveSetDataCallable(cctx.name(), hdr.id(), topVer), + new RemoveSetDataCallable(cctx.name(), id, topVer), nodes, true).get(); } - catch (ClusterTopologyException e) { + catch (ClusterTopologyCheckedException e) { if (log.isDebugEnabled()) log.debug("RemoveSetData job failed, will retry: " + e); @@ -479,12 +459,10 @@ public class CacheDataStructuresManager<K, V> extends GridCacheManagerAdapter<K, } } else { - blockSet(hdr.id()); + blockSet(id); - cctx.dataStructures().removeSetData(hdr.id(), 0); + cctx.dataStructures().removeSetData(id, 0); } - - return true; } /** @@ -549,9 +527,7 @@ public class CacheDataStructuresManager<K, V> extends GridCacheManagerAdapter<K, @SuppressWarnings("unchecked") @Nullable private <T> T retryRemove(final GridCache cache, final Object key) throws IgniteCheckedException { return CacheDataStructuresProcessor.retry(log, new Callable<T>() { - @Nullable - @Override - public T call() throws Exception { + @Nullable @Override public T call() throws Exception { return (T) cache.remove(key); } });