# ignite-6
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/7e3c81b4 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/7e3c81b4 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/7e3c81b4 Branch: refs/heads/ignite-6 Commit: 7e3c81b48bd7879072b859f26140e5194d58f654 Parents: 67a50f9 Author: sboikov <sboi...@gridgain.com> Authored: Wed Jan 28 12:11:15 2015 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Wed Jan 28 14:56:25 2015 +0300 ---------------------------------------------------------------------- .../datastructures/CacheAtomicLongExample.java | 9 +- .../CacheAtomicReferenceExample.java | 20 +- .../CacheAtomicStampedExample.java | 19 +- .../CacheCountDownLatchExample.java | 18 +- .../datastructures/CacheQueueExample.java | 4 +- .../client/memcache/MemcacheRestExample.java | 2 +- .../rest/ClientMemcachedProtocolSelfTest.java | 8 +- .../JettyRestProcessorAbstractSelfTest.java | 21 +- .../rest/RestMemcacheProtocolSelfTest.java | 60 +-- .../processors/rest/TestMemcacheClient.java | 10 +- .../org/apache/ignite/internal/GridGainEx.java | 2 +- .../processors/cache/GridCacheProxyImpl.java | 7 +- .../CacheDataStructuresManager.java | 422 ++++++++++++++++++- .../CacheDataStructuresProcessor.java | 405 +----------------- .../datastructures/GridCacheQueueAdapter.java | 8 +- .../datastructures/GridCacheSetImpl.java | 2 +- .../datastructures/GridCacheSetProxy.java | 2 +- .../processors/rest/GridRestProcessor.java | 13 +- .../handlers/cache/GridCacheCommandHandler.java | 102 ----- .../DatastructuresCommandHandler.java | 108 +++++ .../tcp/GridTcpMemcachedNioListener.java | 56 ++- .../rest/request/DatastructuresRequest.java | 74 ++++ .../rest/request/GridRestCacheRequest.java | 37 +- .../GridCacheQueryInternalKeysSelfTest.java | 2 +- .../GridCacheReferenceCleanupSelfTest.java | 2 +- ...eAbstractDataStructuresFailoverSelfTest.java | 43 +- ...actQueueFailoverDataConsistencySelfTest.java | 2 +- ...cheAtomicReferenceMultiNodeAbstractTest.java | 60 +-- .../GridCacheQueueCleanupSelfTest.java | 48 ++- ...ridCacheQueueJoinedNodeSelfAbstractTest.java | 31 +- ...GridCacheQueueMultiNodeAbstractSelfTest.java | 15 +- ...dCacheQueueMultiNodeConsistencySelfTest.java | 134 ++---- .../GridCacheSetAbstractSelfTest.java | 20 +- .../GridCacheSetFailoverAbstractSelfTest.java | 34 +- .../IgniteCollectionAbstractTest.java | 6 +- ...ionedAtomicQueueCreateMultiNodeSelfTest.java | 10 +- ...PartitionedAtomicReferenceMultiNodeTest.java | 19 +- ...chePartitionedAtomicSetFailoverSelfTest.java | 8 +- ...rtitionedDataStructuresFailoverSelfTest.java | 8 +- ...PartitionedQueueCreateMultiNodeSelfTest.java | 47 ++- ...dCachePartitionedQueueEntryMoveSelfTest.java | 56 +-- ...CachePartitionedQueueJoinedNodeSelfTest.java | 25 +- ...GridCachePartitionedSetFailoverSelfTest.java | 6 +- ...eReplicatedAtomicReferenceMultiNodeTest.java | 14 +- ...eplicatedDataStructuresFailoverSelfTest.java | 7 +- .../dht/GridCacheDhtInternalEntrySelfTest.java | 33 +- .../local/GridCacheLocalFullApiSelfTest.java | 3 +- ...ridCacheContinuousQueryAbstractSelfTest.java | 3 +- .../cache/GridCacheDataStructuresLoadTest.java | 202 ++++++--- .../marshaller/GridMarshallerAbstractTest.java | 30 -- .../GridCacheAbstractFieldsQuerySelfTest.java | 5 +- .../http/jetty/GridJettyRestHandler.java | 23 +- .../tests/ScalarAffinityRoutingSpec.scala | 5 +- 53 files changed, 1156 insertions(+), 1154 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7e3c81b4/examples/src/main/java/org/apache/ignite/examples/datagrid/datastructures/CacheAtomicLongExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/datagrid/datastructures/CacheAtomicLongExample.java b/examples/src/main/java/org/apache/ignite/examples/datagrid/datastructures/CacheAtomicLongExample.java index cff2aa8..279a571 100644 --- a/examples/src/main/java/org/apache/ignite/examples/datagrid/datastructures/CacheAtomicLongExample.java +++ b/examples/src/main/java/org/apache/ignite/examples/datagrid/datastructures/CacheAtomicLongExample.java @@ -33,9 +33,6 @@ import java.util.*; * start GridGain node with {@code examples/config/example-cache.xml} configuration. */ public final class CacheAtomicLongExample { - /** Cache name. */ - private static final String CACHE_NAME = "partitioned_tx"; - /** Number of retries */ private static final int RETRIES = 20; @@ -48,20 +45,20 @@ public final class CacheAtomicLongExample { public static void main(String[] args) throws IgniteCheckedException { try (Ignite g = Ignition.start("examples/config/example-cache.xml")) { System.out.println(); - System.out.println(">>> Cache atomic long example started."); + System.out.println(">>> Atomic long example started."); // Make name for atomic long (by which it will be known in the grid). String atomicName = UUID.randomUUID().toString(); // Initialize atomic long in grid. - final IgniteAtomicLong atomicLong = g.cache(CACHE_NAME).dataStructures().atomicLong(atomicName, 0, true); + final IgniteAtomicLong atomicLong = g.atomicLong(atomicName, 0, true); System.out.println(); System.out.println("Atomic long initial value : " + atomicLong.get() + '.'); // Try increment atomic long from all grid nodes. // Note that this node is also part of the grid. - g.compute(g.cluster().forCache(CACHE_NAME)).call(new IgniteCallable<Object>() { + g.compute().broadcast(new IgniteCallable<Object>() { @Override public Object call() throws Exception { for (int i = 0; i < RETRIES; i++) System.out.println("AtomicLong value has been incremented: " + atomicLong.incrementAndGet()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7e3c81b4/examples/src/main/java/org/apache/ignite/examples/datagrid/datastructures/CacheAtomicReferenceExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/datagrid/datastructures/CacheAtomicReferenceExample.java b/examples/src/main/java/org/apache/ignite/examples/datagrid/datastructures/CacheAtomicReferenceExample.java index 1821320..04576fd 100644 --- a/examples/src/main/java/org/apache/ignite/examples/datagrid/datastructures/CacheAtomicReferenceExample.java +++ b/examples/src/main/java/org/apache/ignite/examples/datagrid/datastructures/CacheAtomicReferenceExample.java @@ -33,9 +33,6 @@ import java.util.*; * start GridGain node with {@code examples/config/example-cache.xml} configuration. */ public final class CacheAtomicReferenceExample { - /** Cache name. */ - private static final String CACHE_NAME = "partitioned_tx"; - /** * Executes example. * @@ -45,7 +42,7 @@ public final class CacheAtomicReferenceExample { public static void main(String[] args) throws IgniteCheckedException { try (Ignite g = Ignition.start("examples/config/example-cache.xml")) { System.out.println(); - System.out.println(">>> Cache atomic reference example started."); + System.out.println(">>> Atomic reference example started."); // Make name of atomic reference. final String refName = UUID.randomUUID().toString(); @@ -54,13 +51,12 @@ public final class CacheAtomicReferenceExample { String val = UUID.randomUUID().toString(); // Initialize atomic reference in grid. - IgniteAtomicReference<String> ref = g.cache(CACHE_NAME).dataStructures(). - atomicReference(refName, val, true); + IgniteAtomicReference<String> ref = g.atomicReference(refName, val, true); System.out.println("Atomic reference initial value : " + ref.get() + '.'); // Make closure for checking atomic reference value on grid. - Runnable c = new ReferenceClosure(CACHE_NAME, refName); + Runnable c = new ReferenceClosure(refName); // Check atomic reference on all grid nodes. g.compute().run(c); @@ -94,26 +90,20 @@ public final class CacheAtomicReferenceExample { * Obtains atomic reference. */ private static class ReferenceClosure implements IgniteRunnable { - /** Cache name. */ - private final String cacheName; - /** Reference name. */ private final String refName; /** - * @param cacheName Cache name. * @param refName Reference name. */ - ReferenceClosure(String cacheName, String refName) { - this.cacheName = cacheName; + ReferenceClosure(String refName) { this.refName = refName; } /** {@inheritDoc} */ @Override public void run() { try { - IgniteAtomicReference<String> ref = Ignition.ignite().cache(cacheName).dataStructures(). - atomicReference(refName, null, true); + IgniteAtomicReference<String> ref = Ignition.ignite().atomicReference(refName, null, true); System.out.println("Atomic reference value is " + ref.get() + '.'); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7e3c81b4/examples/src/main/java/org/apache/ignite/examples/datagrid/datastructures/CacheAtomicStampedExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/datagrid/datastructures/CacheAtomicStampedExample.java b/examples/src/main/java/org/apache/ignite/examples/datagrid/datastructures/CacheAtomicStampedExample.java index 35ae486..0704fc7 100644 --- a/examples/src/main/java/org/apache/ignite/examples/datagrid/datastructures/CacheAtomicStampedExample.java +++ b/examples/src/main/java/org/apache/ignite/examples/datagrid/datastructures/CacheAtomicStampedExample.java @@ -33,9 +33,6 @@ import java.util.*; * start GridGain node with {@code examples/config/example-cache.xml} configuration. */ public final class CacheAtomicStampedExample { - /** Cache name. */ - private static final String CACHE_NAME = "partitioned_tx"; - /** * Executes example. * @@ -45,7 +42,7 @@ public final class CacheAtomicStampedExample { public static void main(String[] args) throws IgniteCheckedException { try (Ignite g = Ignition.start("examples/config/example-cache.xml")) { System.out.println(); - System.out.println(">>> Cache atomic stamped example started."); + System.out.println(">>> Atomic stamped example started."); // Make name of atomic stamped. String stampedName = UUID.randomUUID().toString(); @@ -57,13 +54,12 @@ public final class CacheAtomicStampedExample { String stamp = UUID.randomUUID().toString(); // Initialize atomic stamped in cache. - IgniteAtomicStamped<String, String> stamped = g.cache(CACHE_NAME).dataStructures(). - atomicStamped(stampedName, val, stamp, true); + IgniteAtomicStamped<String, String> stamped = g.atomicStamped(stampedName, val, stamp, true); System.out.println("Atomic stamped initial [value=" + stamped.value() + ", stamp=" + stamped.stamp() + ']'); // Make closure for checking atomic stamped on grid. - Runnable c = new StampedUpdateClosure(CACHE_NAME, stampedName); + Runnable c = new StampedUpdateClosure(stampedName); // Check atomic stamped on all grid nodes. g.compute().run(c); @@ -100,25 +96,20 @@ public final class CacheAtomicStampedExample { * Performs update of on an atomic stamped variable in cache. */ private static class StampedUpdateClosure implements IgniteRunnable { - /** Cache name. */ - private final String cacheName; - /** Atomic stamped variable name. */ private final String stampedName; /** - * @param cacheName Cache name. * @param stampedName Atomic stamped variable name. */ - StampedUpdateClosure(String cacheName, String stampedName) { - this.cacheName = cacheName; + StampedUpdateClosure(String stampedName) { this.stampedName = stampedName; } /** {@inheritDoc} */ @Override public void run() { try { - IgniteAtomicStamped<String, String> stamped = Ignition.ignite().cache(cacheName).dataStructures(). + IgniteAtomicStamped<String, String> stamped = Ignition.ignite(). atomicStamped(stampedName, null, null, true); System.out.println("Atomic stamped [value=" + stamped.value() + ", stamp=" + stamped.stamp() + ']'); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7e3c81b4/examples/src/main/java/org/apache/ignite/examples/datagrid/datastructures/CacheCountDownLatchExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/datagrid/datastructures/CacheCountDownLatchExample.java b/examples/src/main/java/org/apache/ignite/examples/datagrid/datastructures/CacheCountDownLatchExample.java index ce12349..7fa74da 100644 --- a/examples/src/main/java/org/apache/ignite/examples/datagrid/datastructures/CacheCountDownLatchExample.java +++ b/examples/src/main/java/org/apache/ignite/examples/datagrid/datastructures/CacheCountDownLatchExample.java @@ -33,9 +33,6 @@ import java.util.*; * start GridGain node with {@code examples/config/example-cache.xml} configuration. */ public class CacheCountDownLatchExample { - /** Cache name. */ - private static final String CACHE_NAME = "partitioned_tx"; - /** Number of latch initial count */ private static final int INITIAL_COUNT = 10; @@ -54,14 +51,13 @@ public class CacheCountDownLatchExample { final String latchName = UUID.randomUUID().toString(); // Initialize count down latch in grid. - IgniteCountDownLatch latch = g.cache(CACHE_NAME).dataStructures(). - countDownLatch(latchName, INITIAL_COUNT, false, true); + IgniteCountDownLatch latch = g.countDownLatch(latchName, INITIAL_COUNT, false, true); System.out.println("Latch initial value: " + latch.count()); // Start waiting on the latch on all grid nodes. for (int i = 0; i < INITIAL_COUNT; i++) - g.compute().run(new LatchClosure(CACHE_NAME, latchName)); + g.compute().run(new LatchClosure(latchName)); // Wait for latch to go down which essentially means that all remote closures completed. latch.await(); @@ -78,26 +74,20 @@ public class CacheCountDownLatchExample { * Closure which simply waits on the latch on all nodes. */ private static class LatchClosure implements IgniteRunnable { - /** Cache name. */ - private final String cacheName; - /** Latch name. */ private final String latchName; /** - * @param cacheName Cache name. * @param latchName Latch name. */ - LatchClosure(String cacheName, String latchName) { - this.cacheName = cacheName; + LatchClosure(String latchName) { this.latchName = latchName; } /** {@inheritDoc} */ @Override public void run() { try { - IgniteCountDownLatch latch = Ignition.ignite().cache(cacheName).dataStructures(). - countDownLatch(latchName, 1, false, true); + IgniteCountDownLatch latch = Ignition.ignite().countDownLatch(latchName, 1, false, true); int newCnt = latch.countDown(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7e3c81b4/examples/src/main/java/org/apache/ignite/examples/datagrid/datastructures/CacheQueueExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/datagrid/datastructures/CacheQueueExample.java b/examples/src/main/java/org/apache/ignite/examples/datagrid/datastructures/CacheQueueExample.java index 3f273f0..aebb950 100644 --- a/examples/src/main/java/org/apache/ignite/examples/datagrid/datastructures/CacheQueueExample.java +++ b/examples/src/main/java/org/apache/ignite/examples/datagrid/datastructures/CacheQueueExample.java @@ -142,8 +142,8 @@ public class CacheQueueExample { System.out.println("Queue size after clearing: " + queue.size()); - // Remove queue from cache. - g.cache(CACHE_NAME).dataStructures().removeQueue(queue.name()); + // Remove queue. + queue.close(); // Try to work with removed queue. try { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7e3c81b4/examples/src/main/java/org/apache/ignite/examples/misc/client/memcache/MemcacheRestExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/misc/client/memcache/MemcacheRestExample.java b/examples/src/main/java/org/apache/ignite/examples/misc/client/memcache/MemcacheRestExample.java index 7fc12a3..8be427d 100644 --- a/examples/src/main/java/org/apache/ignite/examples/misc/client/memcache/MemcacheRestExample.java +++ b/examples/src/main/java/org/apache/ignite/examples/misc/client/memcache/MemcacheRestExample.java @@ -87,7 +87,7 @@ public class MemcacheRestExample { System.out.println(">>> Current cache size: " + cache.size() + " (expected: 0)."); // Create atomic long. - IgniteAtomicLong l = cache.dataStructures().atomicLong("atomicLong", 10, true); + IgniteAtomicLong l = g.atomicLong("atomicLong", 10, true); // Increment atomic long by 5 using Memcache client. if (client.incr("atomicLong", 5, 0) == 15) http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7e3c81b4/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/ClientMemcachedProtocolSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/ClientMemcachedProtocolSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/ClientMemcachedProtocolSelfTest.java index c779eb8..b46e0cc 100644 --- a/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/ClientMemcachedProtocolSelfTest.java +++ b/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/ClientMemcachedProtocolSelfTest.java @@ -210,11 +210,11 @@ public class ClientMemcachedProtocolSelfTest extends AbstractRestProcessorSelfTe public void testIncrement() throws Exception { Assert.assertEquals(5, client.incr("incrKey", 3, 2)); - assertEquals(5, cache().dataStructures().atomicLong("incrKey", 0, true).get()); + assertEquals(5, grid(0).atomicLong("incrKey", 0, true).get()); Assert.assertEquals(15, client.incr("incrKey", 10, 0)); - assertEquals(15, cache().dataStructures().atomicLong("incrKey", 0, true).get()); + assertEquals(15, grid(0).atomicLong("incrKey", 0, true).get()); } /** @@ -223,11 +223,11 @@ public class ClientMemcachedProtocolSelfTest extends AbstractRestProcessorSelfTe public void testDecrement() throws Exception { Assert.assertEquals(5, client.decr("decrKey", 10, 15)); - assertEquals(5, cache().dataStructures().atomicLong("decrKey", 0, true).get()); + assertEquals(5, grid(0).atomicLong("decrKey", 0, true).get()); Assert.assertEquals(2, client.decr("decrKey", 3, 0)); - assertEquals(2, cache().dataStructures().atomicLong("decrKey", 0, true).get()); + assertEquals(2, grid(0).atomicLong("decrKey", 0, true).get()); } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7e3c81b4/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAbstractSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAbstractSelfTest.java index 2163563..a94b8b2 100644 --- a/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAbstractSelfTest.java +++ b/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAbstractSelfTest.java @@ -118,9 +118,8 @@ abstract class JettyRestProcessorAbstractSelfTest extends AbstractRestProcessorS * @param success Success flag. * @return Regex pattern for JSON. */ - private String cacheIntegerPattern(int res, boolean success) { - return "\\{\\\"affinityNodeId\\\":\\\"\\w{8}-\\w{4}-\\w{4}-\\w{4}-\\w{12}\\\"\\," + - "\\\"error\\\":\\\"\\\"\\," + + private String integerPattern(int res, boolean success) { + return "\\{\\\"error\\\":\\\"\\\"\\," + "\\\"response\\\":" + res + "\\," + "\\\"sessionToken\\\":\\\"\\\"," + "\\\"successStatus\\\":" + (success ? 0 : 1) + "\\}"; @@ -493,18 +492,18 @@ abstract class JettyRestProcessorAbstractSelfTest extends AbstractRestProcessorS assertNotNull(ret); assertTrue(!ret.isEmpty()); - jsonEquals(ret, cacheIntegerPattern(5, true)); + jsonEquals(ret, integerPattern(5, true)); - assertEquals(5, cache().dataStructures().atomicLong("incrKey", 0, true).get()); + assertEquals(5, grid(0).atomicLong("incrKey", 0, true).get()); ret = content(F.asMap("cmd", "incr", "key", "incrKey", "delta", "10")); assertNotNull(ret); assertTrue(!ret.isEmpty()); - jsonEquals(ret, cacheIntegerPattern(15, true)); + jsonEquals(ret, integerPattern(15, true)); - assertEquals(15, cache().dataStructures().atomicLong("incrKey", 0, true).get()); + assertEquals(15, grid(0).atomicLong("incrKey", 0, true).get()); } /** @@ -516,18 +515,18 @@ abstract class JettyRestProcessorAbstractSelfTest extends AbstractRestProcessorS assertNotNull(ret); assertTrue(!ret.isEmpty()); - jsonEquals(ret, cacheIntegerPattern(5, true)); + jsonEquals(ret, integerPattern(5, true)); - assertEquals(5, cache().dataStructures().atomicLong("decrKey", 0, true).get()); + assertEquals(5, grid(0).atomicLong("decrKey", 0, true).get()); ret = content(F.asMap("cmd", "decr", "key", "decrKey", "delta", "3")); assertNotNull(ret); assertTrue(!ret.isEmpty()); - jsonEquals(ret, cacheIntegerPattern(2, true)); + jsonEquals(ret, integerPattern(2, true)); - assertEquals(2, cache().dataStructures().atomicLong("decrKey", 0, true).get()); + assertEquals(2, grid(0).atomicLong("decrKey", 0, true).get()); } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7e3c81b4/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/RestMemcacheProtocolSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/RestMemcacheProtocolSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/RestMemcacheProtocolSelfTest.java index a996352..79b22a3 100644 --- a/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/RestMemcacheProtocolSelfTest.java +++ b/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/RestMemcacheProtocolSelfTest.java @@ -243,38 +243,46 @@ public class RestMemcacheProtocolSelfTest extends GridCommonAbstractTest { * @throws Exception If failed. */ public void testIncrement() throws Exception { - assertEquals(15L, client().cacheIncrement(null, "key", 10L, 5L)); - assertEquals(15L, grid().cache(null).dataStructures().atomicLong("key", 0, true).get()); - assertEquals(18L, client().cacheIncrement(null, "key", 20L, 3L)); - assertEquals(18L, grid().cache(null).dataStructures().atomicLong("key", 0, true).get()); - assertEquals(20L, client().cacheIncrement(null, "key", null, 2L)); - assertEquals(20L, grid().cache(null).dataStructures().atomicLong("key", 0, true).get()); - - assertEquals(15L, client().cacheIncrement(CACHE_NAME, "key", 10L, 5L)); - assertEquals(15L, grid().cache(CACHE_NAME).dataStructures().atomicLong("key", 0, true).get()); - assertEquals(18L, client().cacheIncrement(CACHE_NAME, "key", 20L, 3L)); - assertEquals(18L, grid().cache(CACHE_NAME).dataStructures().atomicLong("key", 0, true).get()); - assertEquals(20L, client().cacheIncrement(CACHE_NAME, "key", null, 2L)); - assertEquals(20L, grid().cache(CACHE_NAME).dataStructures().atomicLong("key", 0, true).get()); + assertEquals(15L, client().increment("key", 10L, 5L)); + assertEquals(15L, grid().atomicLong("key", 0, true).get()); + + assertEquals(18L, client().increment("key", 20L, 3L)); + assertEquals(18L, grid().atomicLong("key", 0, true).get()); + + assertEquals(20L, client().increment("key", null, 2L)); + assertEquals(20L, grid().atomicLong("key", 0, true).get()); + + assertEquals(15L, client().increment("key1", 10L, 5L)); + assertEquals(15L, grid().atomicLong("key1", 0, true).get()); + + assertEquals(18L, client().increment("key1", 20L, 3L)); + assertEquals(18L, grid().atomicLong("key1", 0, true).get()); + + assertEquals(20L, client().increment("key1", null, 2L)); + assertEquals(20L, grid().atomicLong("key1", 0, true).get()); } /** * @throws Exception If failed. */ public void testDecrement() throws Exception { - assertEquals(15L, client().cacheDecrement(null, "key", 20L, 5L)); - assertEquals(15L, grid().cache(null).dataStructures().atomicLong("key", 0, true).get()); - assertEquals(12L, client().cacheDecrement(null, "key", 20L, 3L)); - assertEquals(12L, grid().cache(null).dataStructures().atomicLong("key", 0, true).get()); - assertEquals(10L, client().cacheDecrement(null, "key", null, 2L)); - assertEquals(10L, grid().cache(null).dataStructures().atomicLong("key", 0, true).get()); - - assertEquals(15L, client().cacheDecrement(CACHE_NAME, "key", 20L, 5L)); - assertEquals(15L, grid().cache(CACHE_NAME).dataStructures().atomicLong("key", 0, true).get()); - assertEquals(12L, client().cacheDecrement(CACHE_NAME, "key", 20L, 3L)); - assertEquals(12L, grid().cache(CACHE_NAME).dataStructures().atomicLong("key", 0, true).get()); - assertEquals(10L, client().cacheDecrement(CACHE_NAME, "key", null, 2L)); - assertEquals(10L, grid().cache(CACHE_NAME).dataStructures().atomicLong("key", 0, true).get()); + assertEquals(15L, client().decrement("key", 20L, 5L)); + assertEquals(15L, grid().atomicLong("key", 0, true).get()); + + assertEquals(12L, client().decrement("key", 20L, 3L)); + assertEquals(12L, grid().atomicLong("key", 0, true).get()); + + assertEquals(10L, client().decrement("key", null, 2L)); + assertEquals(10L, grid().atomicLong("key", 0, true).get()); + + assertEquals(15L, client().decrement("key1", 20L, 5L)); + assertEquals(15L, grid().atomicLong("key1", 0, true).get()); + + assertEquals(12L, client().decrement("key1", 20L, 3L)); + assertEquals(12L, grid().atomicLong("key1", 0, true).get()); + + assertEquals(10L, client().decrement("key1", null, 2L)); + assertEquals(10L, grid().atomicLong("key1", 0, true).get()); } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7e3c81b4/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/TestMemcacheClient.java ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/TestMemcacheClient.java b/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/TestMemcacheClient.java index e7c625f..b82b346 100644 --- a/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/TestMemcacheClient.java +++ b/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/TestMemcacheClient.java @@ -535,33 +535,31 @@ final class TestMemcacheClient { } /** - * @param cacheName Cache name. * @param key Key. * @param init Initial value (optional). * @param incr Amount to add. * @return New value. * @throws IgniteCheckedException In case of error. */ - public <K> long cacheIncrement(@Nullable String cacheName, K key, @Nullable Long init, long incr) + public <K> long increment(K key, @Nullable Long init, long incr) throws IgniteCheckedException { assert key != null; - return makeRequest(Command.INCREMENT, cacheName, key, null, incr, init).<Long>getObject(); + return makeRequest(Command.INCREMENT, null, key, null, incr, init).<Long>getObject(); } /** - * @param cacheName Cache name. * @param key Key. * @param init Initial value (optional). * @param decr Amount to subtract. * @return New value. * @throws IgniteCheckedException In case of error. */ - public <K> long cacheDecrement(@Nullable String cacheName, K key, @Nullable Long init, long decr) + public <K> long decrement(K key, @Nullable Long init, long decr) throws IgniteCheckedException { assert key != null; - return makeRequest(Command.DECREMENT, cacheName, key, null, decr, init).<Long>getObject(); + return makeRequest(Command.DECREMENT, null, key, null, decr, init).<Long>getObject(); } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7e3c81b4/modules/core/src/main/java/org/apache/ignite/internal/GridGainEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridGainEx.java b/modules/core/src/main/java/org/apache/ignite/internal/GridGainEx.java index 8341795..1e599ef 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridGainEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridGainEx.java @@ -1916,7 +1916,7 @@ public class GridGainEx { int cacheIdx = 1; if (hasHadoop) - copies[cacheIdx] = CU.hadoopSystemCache(); + copies[cacheIdx++] = CU.hadoopSystemCache(); if (hasAtomics) copies[cacheIdx] = atomicsSystemCache(cfg.getAtomicConfiguration(), clientDisco); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7e3c81b4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java index 8218b30..ca7c614 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java @@ -68,9 +68,6 @@ public class GridCacheProxyImpl<K, V> implements GridCacheProxy<K, V>, Externali /** Cache queries. */ private CacheQueries<K, V> qry; - /** Data structures. */ - private CacheDataStructures dataStructures; - /** Affinity. */ private CacheAffinity<K> aff; @@ -99,7 +96,6 @@ public class GridCacheProxyImpl<K, V> implements GridCacheProxy<K, V>, Externali cache = ctx.cache(); qry = new GridCacheQueriesProxy<>(ctx, prj, (GridCacheQueriesEx<K, V>)delegate.queries()); - dataStructures = new GridCacheDataStructuresProxy<>(ctx, ctx.cache().dataStructures()); aff = new GridCacheAffinityProxy<>(ctx, ctx.cache().affinity()); } @@ -151,7 +147,7 @@ public class GridCacheProxyImpl<K, V> implements GridCacheProxy<K, V>, Externali /** {@inheritDoc} */ @Override public CacheDataStructures dataStructures() { - return dataStructures; + return null; } /** {@inheritDoc} */ @@ -1886,7 +1882,6 @@ public class GridCacheProxyImpl<K, V> implements GridCacheProxy<K, V>, Externali cache = ctx.cache(); qry = new GridCacheQueriesProxy<>(ctx, prj, (GridCacheQueriesEx<K, V>)delegate.queries()); - dataStructures = new GridCacheDataStructuresProxy<>(ctx, ctx.cache().dataStructures()); aff = new GridCacheAffinityProxy<>(ctx, ctx.cache().affinity()); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7e3c81b4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java index c122a54..4073d00 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java @@ -19,12 +19,16 @@ package org.apache.ignite.internal.processors.cache.datastructures; import org.apache.ignite.*; import org.apache.ignite.cache.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.internal.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.query.continuous.*; import org.apache.ignite.internal.processors.datastructures.*; +import org.apache.ignite.internal.processors.task.*; import org.apache.ignite.internal.util.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.lang.*; +import org.apache.ignite.resources.*; import org.jdk8.backport.*; import org.jetbrains.annotations.*; @@ -33,14 +37,23 @@ import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.*; +import static org.apache.ignite.cache.CacheMode.*; +import static org.apache.ignite.internal.GridClosureCallMode.*; + /** * */ public class CacheDataStructuresManager<K, V> extends GridCacheManagerAdapter<K, V> { + /** Sets map. */ + private final ConcurrentMap<IgniteUuid, GridCacheSetProxy> setsMap; + /** Set keys used for set iteration. */ private ConcurrentMap<IgniteUuid, GridConcurrentHashSet<GridCacheSetItemKey>> setDataMap = new ConcurrentHashMap8<>(); + /** Queues map. */ + private final ConcurrentMap<IgniteUuid, GridCacheQueueProxy> queuesMap; + /** Queue header view. */ private CacheProjection<GridCacheQueueHeaderKey, GridCacheQueueHeader> queueHdrView; @@ -59,6 +72,15 @@ public class CacheDataStructuresManager<K, V> extends GridCacheManagerAdapter<K, /** Init flag. */ private boolean initFlag; + /** + * + */ + public CacheDataStructuresManager() { + queuesMap = new ConcurrentHashMap8<>(10); + + setsMap = new ConcurrentHashMap8<>(10); + } + /** {@inheritDoc} */ @Override protected void onKernalStart0() throws IgniteCheckedException { try { @@ -83,6 +105,9 @@ public class CacheDataStructuresManager<K, V> extends GridCacheManagerAdapter<K, U.warn(log, "Failed to cancel queue header query.", e); } } + + for (GridCacheQueueProxy q : queuesMap.values()) + q.delegate().onKernalStop(); } /** @@ -106,7 +131,7 @@ public class CacheDataStructuresManager<K, V> extends GridCacheManagerAdapter<K, * @throws IgniteCheckedException If failed. */ @SuppressWarnings("unchecked") - @Nullable public GridCacheQueueHeader queue(final String name, + @Nullable public <T> GridCacheQueueProxy<T> queue(final String name, final int cap, boolean colloc, final boolean create) @@ -114,6 +139,34 @@ public class CacheDataStructuresManager<K, V> extends GridCacheManagerAdapter<K, { waitInitialization(); + // Non collocated mode enabled only for PARTITIONED cache. + final boolean colloc0 = create && (cctx.cache().configuration().getCacheMode() != PARTITIONED || colloc); + + if (cctx.atomic()) + return queue0(name, cap, colloc0, create); + + return CU.outTx(new Callable<GridCacheQueueProxy<T>>() { + @Nullable @Override public GridCacheQueueProxy<T> call() throws Exception { + return queue0(name, cap, colloc0, create); + } + }, cctx); + } + + /** + * @param name Queue name. + * @param cap Capacity. + * @param colloc Collocated flag. + * @param create Create flag. + * @return Queue header. + * @throws IgniteCheckedException If failed. + */ + @SuppressWarnings("unchecked") + @Nullable public <T> GridCacheQueueProxy<T> queue0(final String name, + final int cap, + boolean colloc, + final boolean create) + throws IgniteCheckedException + { cctx.gate().enter(); try { @@ -154,9 +207,24 @@ public class CacheDataStructuresManager<K, V> extends GridCacheManagerAdapter<K, for (GridCacheContinuousQueryEntry e : entries) { GridCacheQueueHeaderKey key = (GridCacheQueueHeaderKey)e.getKey(); GridCacheQueueHeader hdr = (GridCacheQueueHeader)e.getValue(); - GridCacheQueueHeader oldHdr = (GridCacheQueueHeader)e.getOldValue(); - cctx.kernalContext().dataStructures().onQueueUpdated(key, hdr, oldHdr); + for (final GridCacheQueueProxy queue : queuesMap.values()) { + if (queue.name().equals(key.queueName())) { + if (hdr == null) { + GridCacheQueueHeader oldHdr = (GridCacheQueueHeader)e.getOldValue(); + + assert oldHdr != null; + + if (oldHdr.id().equals(queue.delegate().id())) { + queue.delegate().onRemoved(false); + + queuesMap.remove(queue.delegate().id()); + } + } + else + queue.delegate().onHeaderChanged(hdr); + } + } } return true; @@ -174,7 +242,19 @@ public class CacheDataStructuresManager<K, V> extends GridCacheManagerAdapter<K, true); } - return hdr; + GridCacheQueueProxy queue = queuesMap.get(hdr.id()); + + if (queue == null) { + queue = new GridCacheQueueProxy(cctx, cctx.atomic() ? new GridAtomicCacheQueueImpl<>(name, hdr, cctx) : + new GridTransactionalCacheQueueImpl<>(name, hdr, cctx)); + + GridCacheQueueProxy old = queuesMap.putIfAbsent(hdr.id(), queue); + + if (old != null) + queue = old; + } + + return queue; } finally { cctx.gate().leave(); @@ -213,6 +293,85 @@ public class CacheDataStructuresManager<K, V> extends GridCacheManagerAdapter<K, } /** + * @param name Set name. + * @param colloc Collocated flag. + * @param create Create flag. + * @return Set. + * @throws IgniteCheckedException If failed. + */ + @Nullable public <T> IgniteSet<T> set(final String name, + boolean colloc, + final boolean create) + throws IgniteCheckedException + { + // Non collocated mode enabled only for PARTITIONED cache. + final boolean colloc0 = + create && (cctx.cache().configuration().getCacheMode() != PARTITIONED || colloc); + + if (cctx.atomic()) + return set0(name, colloc0, create); + + return CU.outTx(new Callable<IgniteSet<T>>() { + @Nullable @Override public IgniteSet<T> call() throws Exception { + return set0(name, colloc0, create); + } + }, cctx); + } + + /** + * @param name Name of set. + * @param collocated Collocation flag. + * @param create If {@code true} set will be created in case it is not in cache. + * @return Set. + * @throws IgniteCheckedException If failed. + */ + @SuppressWarnings("unchecked") + @Nullable private <T> IgniteSet<T> set0(String name, + boolean collocated, + boolean create) + throws IgniteCheckedException + { + cctx.gate().enter(); + + try { + GridCacheSetHeaderKey key = new GridCacheSetHeaderKey(name); + + GridCacheSetHeader hdr; + + GridCacheAdapter cache = cctx.cache(); + + if (create) { + hdr = new GridCacheSetHeader(IgniteUuid.randomUuid(), collocated); + + GridCacheSetHeader old = retryPutIfAbsent(cache, key, hdr); + + if (old != null) + hdr = old; + } + else + hdr = (GridCacheSetHeader)cache.get(key); + + if (hdr == null) + return null; + + GridCacheSetProxy<T> set = setsMap.get(hdr.id()); + + if (set == null) { + GridCacheSetProxy<T> old = setsMap.putIfAbsent(hdr.id(), + set = new GridCacheSetProxy<>(cctx, new GridCacheSetImpl<T>(cctx, name, hdr))); + + if (old != null) + set = old; + } + + return set; + } + finally { + cctx.gate().leave(); + } + } + + /** * @param id Set ID. * @return Data for given set. */ @@ -226,7 +385,7 @@ public class CacheDataStructuresManager<K, V> extends GridCacheManagerAdapter<K, * @throws IgniteCheckedException If failed. */ @SuppressWarnings("unchecked") - public void removeSetData(IgniteUuid setId, long topVer) throws IgniteCheckedException { + private void removeSetData(IgniteUuid setId, long topVer) throws IgniteCheckedException { boolean loc = cctx.isLocal(); GridCacheAffinityManager aff = cctx.affinity(); @@ -268,6 +427,67 @@ public class CacheDataStructuresManager<K, V> extends GridCacheManagerAdapter<K, } /** + * @param name Set name. + * @return {@code True} if set was removed. + * @throws IgniteCheckedException If failed. + */ + @SuppressWarnings("unchecked") + public boolean removeSet(String name) throws IgniteCheckedException { + GridCacheSetHeaderKey key = new GridCacheSetHeaderKey(name); + + GridCache cache = cctx.cache(); + + GridCacheSetHeader hdr = retryRemove(cache, key); + + if (hdr == null) + return false; + + if (!cctx.isLocal()) { + while (true) { + long topVer = cctx.topologyVersionFuture().get(); + + Collection<ClusterNode> nodes = CU.affinityNodes(cctx, topVer); + + try { + cctx.closures().callAsyncNoFailover(BROADCAST, + new BlockSetCallable(cctx.name(), hdr.id()), + nodes, + true).get(); + } + catch (ClusterTopologyException e) { + if (log.isDebugEnabled()) + log.debug("BlockSet job failed, will retry: " + e); + + continue; + } + + try { + cctx.closures().callAsyncNoFailover(BROADCAST, + new RemoveSetDataCallable(cctx.name(), hdr.id(), topVer), + nodes, + true).get(); + } + catch (ClusterTopologyException e) { + if (log.isDebugEnabled()) + log.debug("RemoveSetData job failed, will retry: " + e); + + continue; + } + + if (cctx.topologyVersionFuture().get() == topVer) + break; + } + } + else { + blockSet(hdr.id()); + + cctx.dataStructures().removeSetData(hdr.id(), 0); + } + + return true; + } + + /** * @param key Set item key. * @param rmv {@code True} if item was removed. */ @@ -292,6 +512,52 @@ public class CacheDataStructuresManager<K, V> extends GridCacheManagerAdapter<K, } /** + * @param setId Set ID. + */ + @SuppressWarnings("unchecked") + private void blockSet(IgniteUuid setId) { + GridCacheSetProxy set = setsMap.remove(setId); + + if (set != null) + set.blockOnRemove(); + } + + /** + * @param cache Cache. + * @param key Key. + * @param val Value. + * @throws IgniteCheckedException If failed. + * @return Previous value. + */ + @SuppressWarnings("unchecked") + @Nullable private <T> T retryPutIfAbsent(final GridCache cache, final Object key, final T val) + throws IgniteCheckedException { + return CacheDataStructuresProcessor.retry(log, new Callable<T>() { + @Nullable @Override public T call() throws Exception { + return (T) cache.putIfAbsent(key, val); + } + }); + } + + + /** + * @param cache Cache. + * @param key Key to remove. + * @throws IgniteCheckedException If failed. + * @return Removed value. + */ + @SuppressWarnings("unchecked") + @Nullable private <T> T retryRemove(final GridCache cache, final Object key) throws IgniteCheckedException { + return CacheDataStructuresProcessor.retry(log, new Callable<T>() { + @Nullable + @Override + public T call() throws Exception { + return (T) cache.remove(key); + } + }); + } + + /** * @param cache Cache. * @param keys Keys to remove. * @throws IgniteCheckedException If failed. @@ -337,4 +603,150 @@ public class CacheDataStructuresManager<K, V> extends GridCacheManagerAdapter<K, // No-op. } } + + /** + * Waits for completion of all started set operations and blocks all subsequent operations. + */ + @GridInternal + private static class BlockSetCallable implements Callable<Void>, Externalizable { + /** */ + private static final long serialVersionUID = 0; + + /** Injected grid instance. */ + @IgniteInstanceResource + private Ignite ignite; + + /** */ + private String cacheName; + + /** */ + private IgniteUuid setId; + + /** + * Required by {@link Externalizable}. + */ + public BlockSetCallable() { + // No-op. + } + + /** + * @param cacheName Cache name. + * @param setId Set ID. + */ + private BlockSetCallable(String cacheName, IgniteUuid setId) { + this.cacheName = cacheName; + this.setId = setId; + } + + /** {@inheritDoc} */ + @Override public Void call() throws IgniteCheckedException { + assert ignite != null; + + GridCacheAdapter cache = ((GridKernal)ignite).context().cache().internalCache(cacheName); + + assert cache != null : cacheName; + + cache.context().dataStructures().blockSet(setId); + + return null; + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + U.writeGridUuid(out, setId); + U.writeString(out, cacheName); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + setId = U.readGridUuid(in); + cacheName = U.readString(in); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return "BlockSetCallable [setId=" + setId + ']'; + } + } + + /** + * Removes set items. + */ + @GridInternal + private static class RemoveSetDataCallable implements Callable<Void>, Externalizable { + /** */ + private static final long serialVersionUID = 5053205121218843148L; + + /** Injected grid instance. */ + @IgniteInstanceResource + private Ignite ignite; + + /** */ + private String cacheName; + + /** */ + private IgniteUuid setId; + + /** */ + private long topVer; + + /** + * Required by {@link Externalizable}. + */ + public RemoveSetDataCallable() { + // No-op. + } + + /** + * @param cacheName Cache name. + * @param setId Set ID. + * @param topVer Topology version. + */ + private RemoveSetDataCallable(String cacheName, IgniteUuid setId, long topVer) { + this.cacheName = cacheName; + this.setId = setId; + this.topVer = topVer; + } + + /** {@inheritDoc} */ + @Override public Void call() throws IgniteCheckedException { + assert ignite != null; + + GridCacheAdapter cache = ((GridKernal)ignite).context().cache().internalCache(cacheName); + + assert cache != null; + + GridCacheGateway gate = cache.context().gate(); + + gate.enter(); + + try { + cache.context().dataStructures().removeSetData(setId, topVer); + } + finally { + gate.leave(); + } + + return null; + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + U.writeString(out, cacheName); + U.writeGridUuid(out, setId); + out.writeLong(topVer); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + cacheName = U.readString(in); + setId = U.readGridUuid(in); + topVer = in.readLong(); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return "RemoveSetCallable [setId=" + setId + ']'; + } + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7e3c81b4/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/CacheDataStructuresProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/CacheDataStructuresProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/CacheDataStructuresProcessor.java index 0f0abe7..385cea7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/CacheDataStructuresProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/CacheDataStructuresProcessor.java @@ -70,9 +70,6 @@ public final class CacheDataStructuresProcessor extends GridProcessorAdapter { /** Internal storage of all dataStructures items (sequence, atomic long etc.). */ private final ConcurrentMap<GridCacheInternal, GridCacheRemovable> dsMap; - /** Queues map. */ - private final ConcurrentMap<IgniteUuid, GridCacheQueueProxy> queuesMap; - /** Cache contains only {@code GridCacheAtomicValue}. */ private CacheProjection<GridCacheInternalKey, GridCacheAtomicLongValue> atomicLongView; @@ -88,9 +85,6 @@ public final class CacheDataStructuresProcessor extends GridProcessorAdapter { /** Cache contains only entry {@code GridCacheSequenceValue}. */ private CacheProjection<GridCacheInternalKey, GridCacheAtomicSequenceValue> seqView; - /** Sets map. */ - private final ConcurrentMap<IgniteUuid, GridCacheSetProxy> setsMap; - /** */ private GridCacheContext atomicsCacheCtx; @@ -107,8 +101,6 @@ public final class CacheDataStructuresProcessor extends GridProcessorAdapter { super(ctx); dsMap = new ConcurrentHashMap8<>(INITIAL_CAPACITY); - queuesMap = new ConcurrentHashMap8<>(INITIAL_CAPACITY); - setsMap = new ConcurrentHashMap8<>(INITIAL_CAPACITY); atomicCfg = ctx.config().getAtomicConfiguration(); } @@ -149,14 +141,6 @@ public final class CacheDataStructuresProcessor extends GridProcessorAdapter { } } - /** {@inheritDoc} */ - @Override public void onKernalStop(boolean cancel) { - super.onKernalStop(cancel); - - for (GridCacheQueueProxy q : queuesMap.values()) - q.delegate().onKernalStop(); - } - /** * Gets a sequence from cache or creates one if it's not cached. * @@ -187,7 +171,8 @@ public final class CacheDataStructuresProcessor extends GridProcessorAdapter { return val; return CU.outTx(new Callable<IgniteAtomicSequence>() { - @Override public IgniteAtomicSequence call() throws Exception { + @Override + public IgniteAtomicSequence call() throws Exception { try (IgniteTx tx = CU.txStartInternal(atomicsCacheCtx, dsView, PESSIMISTIC, REPEATABLE_READ)) { GridCacheAtomicSequenceValue seqVal = cast(dsView.get(key), GridCacheAtomicSequenceValue.class); @@ -318,7 +303,8 @@ public final class CacheDataStructuresProcessor extends GridProcessorAdapter { return atomicLong; return CU.outTx(new Callable<IgniteAtomicLong>() { - @Override public IgniteAtomicLong call() throws Exception { + @Override + public IgniteAtomicLong call() throws Exception { try (IgniteTx tx = CU.txStartInternal(atomicsCacheCtx, dsView, PESSIMISTIC, REPEATABLE_READ)) { GridCacheAtomicLongValue val = cast(dsView.get(key), GridCacheAtomicLongValue.class); @@ -376,8 +362,7 @@ public final class CacheDataStructuresProcessor extends GridProcessorAdapter { */ public final boolean removeAtomicLong(String name) throws IgniteCheckedException { assert name != null; - - checkAtomicsConfiguration(); + assert atomicsCacheCtx != null; atomicsCacheCtx.gate().enter(); @@ -473,7 +458,6 @@ public final class CacheDataStructuresProcessor extends GridProcessorAdapter { } finally { atomicsCacheCtx.gate().leave(); - } } @@ -486,8 +470,7 @@ public final class CacheDataStructuresProcessor extends GridProcessorAdapter { */ public final boolean removeAtomicReference(String name) throws IgniteCheckedException { assert name != null; - - checkAtomicsConfiguration(); + assert atomicsCacheCtx != null; atomicsCacheCtx.gate().enter(); @@ -496,12 +479,8 @@ public final class CacheDataStructuresProcessor extends GridProcessorAdapter { return removeInternal(key, GridCacheAtomicReferenceValue.class); } - catch (Exception e) { - throw new IgniteCheckedException("Failed to remove atomic reference by name: " + name, e); - } finally { - atomicsCacheCtx.gate().enter(); - + atomicsCacheCtx.gate().leave(); } } @@ -583,7 +562,6 @@ public final class CacheDataStructuresProcessor extends GridProcessorAdapter { } finally { atomicsCacheCtx.gate().leave(); - } } @@ -596,8 +574,7 @@ public final class CacheDataStructuresProcessor extends GridProcessorAdapter { */ public final boolean removeAtomicStamped(String name) throws IgniteCheckedException { assert name != null; - - checkAtomicsConfiguration(); + assert atomicsCacheCtx != null; atomicsCacheCtx.gate().enter(); @@ -641,55 +618,7 @@ public final class CacheDataStructuresProcessor extends GridProcessorAdapter { GridCacheAdapter cache = cacheForCollection(cfg); - GridCacheContext cctx = cache.context(); - - // Non collocated mode enabled only for PARTITIONED cache. - final boolean colloc = - create && (cctx.cache().configuration().getCacheMode() != PARTITIONED || cfg.isCollocated()); - - GridCacheQueueHeader hdr = cctx.dataStructures().queue(name, cap, colloc, create); - - if (hdr == null) - return null; - - GridCacheQueueProxy queue = queuesMap.get(hdr.id()); - - if (queue == null) { - queue = new GridCacheQueueProxy(cctx, cctx.atomic() ? new GridAtomicCacheQueueImpl<>(name, hdr, cctx) : - new GridTransactionalCacheQueueImpl<>(name, hdr, cctx)); - - GridCacheQueueProxy old = queuesMap.putIfAbsent(hdr.id(), queue); - - if (old != null) - queue = old; - } - - return queue; - } - - /** - * @param key Queue header key. - * @param hdr Current queue header. - * @param oldHdr Previous queue header value. - */ - public void onQueueUpdated(GridCacheQueueHeaderKey key, - @Nullable GridCacheQueueHeader hdr, - @Nullable GridCacheQueueHeader oldHdr) { - for (final GridCacheQueueProxy queue : queuesMap.values()) { - if (queue.name().equals(key.queueName())) { - if (hdr == null) { - assert oldHdr != null; - - if (oldHdr.id().equals(queue.delegate().id())) { - queue.delegate().onRemoved(false); - - queuesMap.remove(queue.delegate().id()); - } - } - else - queue.delegate().onHeaderChanged(hdr); - } - } + return cache.context().dataStructures().queue(name, cap, create && cfg.isCollocated(), create); } /** @@ -789,8 +718,7 @@ public final class CacheDataStructuresProcessor extends GridProcessorAdapter { */ public boolean removeCountDownLatch(final String name) throws IgniteCheckedException { assert name != null; - - checkAtomicsConfiguration(); + assert atomicsCacheCtx != null; atomicsCacheCtx.gate().enter(); @@ -944,6 +872,7 @@ public final class CacheDataStructuresProcessor extends GridProcessorAdapter { * @return Set instance. * @throws IgniteCheckedException If failed. */ + @SuppressWarnings("unchecked") @Nullable public <T> IgniteSet<T> set(final String name, @Nullable IgniteCollectionConfiguration cfg, final boolean create) @@ -955,148 +884,7 @@ public final class CacheDataStructuresProcessor extends GridProcessorAdapter { GridCacheAdapter cache = cacheForCollection(cfg); - final GridCacheContext cctx = cache.context(); - - // Non collocated mode enabled only for PARTITIONED cache. - final boolean colloc = - create && (cctx.cache().configuration().getCacheMode() != PARTITIONED || cfg.isCollocated()); - - if (cctx.atomic()) - return set0(cctx, name, colloc, create); - - return CU.outTx(new Callable<IgniteSet<T>>() { - @Nullable @Override public IgniteSet<T> call() throws Exception { - return set0(cctx, name, colloc, create); - } - }, cctx); - } - - /** - * @param cctx Cache context. - * @param name Name of set. - * @param collocated Collocation flag. - * @param create If {@code true} set will be created in case it is not in cache. - * @return Set. - * @throws IgniteCheckedException If failed. - */ - @SuppressWarnings("unchecked") - @Nullable private <T> IgniteSet<T> set0(GridCacheContext cctx, - String name, - boolean collocated, - boolean create) - throws IgniteCheckedException - { - cctx.gate().enter(); - - try { - GridCacheSetHeaderKey key = new GridCacheSetHeaderKey(name); - - GridCacheSetHeader hdr; - - GridCacheAdapter cache = cctx.cache(); - - if (create) { - hdr = new GridCacheSetHeader(IgniteUuid.randomUuid(), collocated); - - GridCacheSetHeader old = retryPutIfAbsent(cache, key, hdr); - - if (old != null) - hdr = old; - } - else - hdr = (GridCacheSetHeader)cache.get(key); - - if (hdr == null) - return null; - - GridCacheSetProxy<T> set = setsMap.get(hdr.id()); - - if (set == null) { - GridCacheSetProxy<T> old = setsMap.putIfAbsent(hdr.id(), - set = new GridCacheSetProxy<>(cctx, new GridCacheSetImpl<T>(cctx, name, hdr))); - - if (old != null) - set = old; - } - - return set; - } - finally { - cctx.gate().leave(); - } - } - - /** - * @param cctx Cache context. - * @param name Set name. - * @return {@code True} if set was removed. - * @throws IgniteCheckedException If failed. - */ - @SuppressWarnings("unchecked") - public boolean removeSet(GridCacheContext cctx, String name) throws IgniteCheckedException { - GridCacheSetHeaderKey key = new GridCacheSetHeaderKey(name); - - GridCache cache = cctx.cache(); - - GridCacheSetHeader hdr = retryRemove(cache, key); - - if (hdr == null) - return false; - - if (!cctx.isLocal()) { - while (true) { - long topVer = cctx.topologyVersionFuture().get(); - - Collection<ClusterNode> nodes = CU.affinityNodes(cctx, topVer); - - try { - cctx.closures().callAsyncNoFailover(BROADCAST, - new BlockSetCallable(hdr.id()), - nodes, - true).get(); - } - catch (ClusterTopologyException e) { - if (log.isDebugEnabled()) - log.debug("BlockSet job failed, will retry: " + e); - - continue; - } - - try { - cctx.closures().callAsyncNoFailover(BROADCAST, - new RemoveSetDataCallable(cctx.name(), hdr.id(), topVer), - nodes, - true).get(); - } - catch (ClusterTopologyException e) { - if (log.isDebugEnabled()) - log.debug("RemoveSetData job failed, will retry: " + e); - - continue; - } - - if (cctx.topologyVersionFuture().get() == topVer) - break; - } - } - else { - blockSet(hdr.id()); - - cctx.dataStructures().removeSetData(hdr.id(), 0); - } - - return true; - } - - /** - * @param setId Set ID. - */ - @SuppressWarnings("unchecked") - private void blockSet(IgniteUuid setId) { - GridCacheSetProxy set = setsMap.remove(setId); - - if (set != null) - set.blockOnRemove(); + return cache.context().dataStructures().set(name, create ? cfg.isCollocated() : false, create); } /** @@ -1136,38 +924,6 @@ public final class CacheDataStructuresProcessor extends GridProcessorAdapter { } /** - * @param cache Cache. - * @param key Key to remove. - * @throws IgniteCheckedException If failed. - * @return Removed value. - */ - @SuppressWarnings("unchecked") - @Nullable private <T> T retryRemove(final GridCache cache, final Object key) throws IgniteCheckedException { - return retry(log, new Callable<T>() { - @Nullable @Override public T call() throws Exception { - return (T)cache.remove(key); - } - }); - } - - /** - * @param cache Cache. - * @param key Key. - * @param val Value. - * @throws IgniteCheckedException If failed. - * @return Previous value. - */ - @SuppressWarnings("unchecked") - @Nullable private <T> T retryPutIfAbsent(final GridCache cache, final Object key, final T val) - throws IgniteCheckedException { - return retry(log, new Callable<T>() { - @Nullable @Override public T call() throws Exception { - return (T)cache.putIfAbsent(key, val); - } - }); - } - - /** * Tries to cast the object to expected type. * * @param obj Object which will be casted. @@ -1200,7 +956,7 @@ public final class CacheDataStructuresProcessor extends GridProcessorAdapter { * @return Cache to use for collection. */ private GridCacheAdapter cacheForCollection(IgniteCollectionConfiguration cfg) { - // TODO IGNITE-29: start collection internal cache with required configuration or use existing one. + // TODO IGNITE-29: start collection internal cache with required configuration or use existing cache. GridCacheAdapter cache = ctx.cache().internalCache("TEST_COLLECTION_CACHE"); if (cache == null) @@ -1228,141 +984,6 @@ public final class CacheDataStructuresProcessor extends GridProcessorAdapter { } /** - * Waits for completion of all started set operations and blocks all subsequent operations. - */ - @GridInternal - private static class BlockSetCallable implements Callable<Void>, Externalizable { - /** */ - private static final long serialVersionUID = 0; - - /** Injected grid instance. */ - @IgniteInstanceResource - private Ignite ignite; - - /** */ - private IgniteUuid setId; - - /** - * Required by {@link Externalizable}. - */ - public BlockSetCallable() { - // No-op. - } - - /** - * @param setId Set ID. - */ - private BlockSetCallable(IgniteUuid setId) { - this.setId = setId; - } - - /** {@inheritDoc} */ - @Override public Void call() throws IgniteCheckedException { - assert ignite != null; - - ((GridKernal)ignite).context().dataStructures().blockSet(setId); - - return null; - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - U.writeGridUuid(out, setId); - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - setId = U.readGridUuid(in); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return "BlockSetCallable [setId=" + setId + ']'; - } - } - - /** - * Removes set items. - */ - @GridInternal - private static class RemoveSetDataCallable implements Callable<Void>, Externalizable { - /** */ - private static final long serialVersionUID = 5053205121218843148L; - - /** Injected grid instance. */ - @IgniteInstanceResource - private Ignite ignite; - - /** */ - private String cacheName; - - /** */ - private IgniteUuid setId; - - /** */ - private long topVer; - - /** - * Required by {@link Externalizable}. - */ - public RemoveSetDataCallable() { - // No-op. - } - - /** - * @param cacheName Cache name. - * @param setId Set ID. - * @param topVer Topology version. - */ - private RemoveSetDataCallable(String cacheName, IgniteUuid setId, long topVer) { - this.cacheName = cacheName; - this.setId = setId; - this.topVer = topVer; - } - - /** {@inheritDoc} */ - @Override public Void call() throws IgniteCheckedException { - assert ignite != null; - - GridCacheAdapter cache = ((GridKernal)ignite).context().cache().internalCache(cacheName); - - assert cache != null; - - GridCacheGateway gate = cache.context().gate(); - - gate.enter(); - - try { - cache.context().dataStructures().removeSetData(setId, topVer); - } - finally { - gate.leave(); - } - - return null; - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - U.writeString(out, cacheName); - U.writeGridUuid(out, setId); - out.writeLong(topVer); - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - cacheName = U.readString(in); - setId = U.readGridUuid(in); - topVer = in.readLong(); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return "RemoveSetCallable [setId=" + setId + ']'; - } - } - - /** * */ static enum DataStructureType { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7e3c81b4/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueAdapter.java index 8d6282d..ba69023 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueAdapter.java @@ -424,7 +424,7 @@ public abstract class GridCacheQueueAdapter<T> extends AbstractCollection<T> imp * * @param throw0 If {@code true} then throws {@link org.apache.ignite.cache.datastructures.CacheDataStructureRemovedRuntimeException}. */ - void onRemoved(boolean throw0) { + public void onRemoved(boolean throw0) { rmvd = true; releaseSemaphores(); @@ -449,7 +449,7 @@ public abstract class GridCacheQueueAdapter<T> extends AbstractCollection<T> imp /** * @param hdr Queue header. */ - void onHeaderChanged(GridCacheQueueHeader hdr) { + public void onHeaderChanged(GridCacheQueueHeader hdr) { if (!hdr.empty()) { readSem.drainPermits(); readSem.release(hdr.size()); @@ -466,7 +466,7 @@ public abstract class GridCacheQueueAdapter<T> extends AbstractCollection<T> imp /** * Grid stop callback. */ - void onKernalStop() { + public void onKernalStop() { releaseSemaphores(); } @@ -481,7 +481,7 @@ public abstract class GridCacheQueueAdapter<T> extends AbstractCollection<T> imp /** * @return Queue unique ID. */ - IgniteUuid id() { + public IgniteUuid id() { return id; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7e3c81b4/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java index f414168..5144e4f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java @@ -335,7 +335,7 @@ public class GridCacheSetImpl<T> extends AbstractCollection<T> implements Ignite if (rmvd) return; - ctx.kernalContext().dataStructures().removeSet(ctx, name); + ctx.dataStructures().removeSet(name); } catch (IgniteCheckedException e) { throw new IgniteException(e); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7e3c81b4/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetProxy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetProxy.java index f303311..6704e9e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetProxy.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetProxy.java @@ -81,7 +81,7 @@ public class GridCacheSetProxy<T> implements IgniteSet<T>, Externalizable { /** * Remove callback. */ - void blockOnRemove() { + public void blockOnRemove() { delegate.removed(true); busyLock.block(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7e3c81b4/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/GridRestProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/GridRestProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/GridRestProcessor.java index 0735447..6ece8b5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/GridRestProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/GridRestProcessor.java @@ -21,6 +21,7 @@ import org.apache.ignite.*; import org.apache.ignite.configuration.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.processors.*; +import org.apache.ignite.internal.processors.rest.handlers.datastructures.*; import org.apache.ignite.internal.util.*; import org.apache.ignite.lang.*; import org.apache.ignite.plugin.security.*; @@ -255,6 +256,7 @@ public class GridRestProcessor extends GridProcessorAdapter { addHandler(new GridVersionCommandHandler(ctx)); addHandler(new GridLogCommandHandler(ctx)); addHandler(new GridPortableMetadataHandler(ctx)); + addHandler(new DataStructuresCommandHandler(ctx)); // Start protocols. startTcpProtocol(); @@ -560,12 +562,17 @@ public class GridRestProcessor extends GridProcessorAdapter { break; + // TODO IGNITE-6. + case CACHE_INCREMENT: + case CACHE_DECREMENT: + perm = GridSecurityPermission.CACHE_PUT; + + break; + case CACHE_PUT: case CACHE_ADD: case CACHE_PUT_ALL: case CACHE_REPLACE: - case CACHE_INCREMENT: - case CACHE_DECREMENT: case CACHE_CAS: case CACHE_APPEND: case CACHE_PREPEND: @@ -625,7 +632,7 @@ public class GridRestProcessor extends GridProcessorAdapter { log.debug("Added REST command handler: " + hnd); for (GridRestCommand cmd : hnd.supportedCommands()) { - assert !handlers.containsKey(cmd); + assert !handlers.containsKey(cmd) : cmd; handlers.put(cmd, hnd); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7e3c81b4/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheCommandHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheCommandHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheCommandHandler.java index 9fbc6ac..03cffbc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheCommandHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheCommandHandler.java @@ -61,8 +61,6 @@ public class GridCacheCommandHandler extends GridRestCommandHandlerAdapter { CACHE_REMOVE, CACHE_REMOVE_ALL, CACHE_REPLACE, - CACHE_INCREMENT, - CACHE_DECREMENT, CACHE_CAS, CACHE_APPEND, CACHE_PREPEND, @@ -255,20 +253,6 @@ public class GridCacheCommandHandler extends GridRestCommandHandlerAdapter { break; } - case CACHE_INCREMENT: { - fut = executeCommand(req.destinationId(), req.clientId(), cacheName, key, - new IncrementCommand(key, req0)); - - break; - } - - case CACHE_DECREMENT: { - fut = executeCommand(req.destinationId(), req.clientId(), cacheName, key, - new DecrementCommand(key, req0)); - - break; - } - case CACHE_CAS: { final Object val1 = req0.value(); final Object val2 = req0.value2(); @@ -407,39 +391,6 @@ public class GridCacheCommandHandler extends GridRestCommandHandlerAdapter { } /** - * Handles increment and decrement commands. - * - * @param cache Cache. - * @param key Key. - * @param req Request. - * @param decr Whether to decrement (increment otherwise). - * @return Future of operation result. - * @throws IgniteCheckedException In case of error. - */ - private static IgniteFuture<?> incrementOrDecrement(CacheProjection<Object, Object> cache, String key, - GridRestCacheRequest req, final boolean decr) throws IgniteCheckedException { - assert cache != null; - assert key != null; - assert req != null; - - Long init = req.initial(); - Long delta = req.delta(); - - if (delta == null) - throw new IgniteCheckedException(GridRestCommandHandlerAdapter.missingParameter("delta")); - - final IgniteAtomicLong l = cache.cache().dataStructures().atomicLong(key, init != null ? init : 0, true); - - final Long d = delta; - - return ((GridKernal)cache.gridProjection().ignite()).context().closure().callLocalSafe(new Callable<Object>() { - @Override public Object call() throws Exception { - return l.addAndGet(decr ? -d : d); - } - }, false); - } - - /** * Handles append and prepend commands. * * @param ctx Kernal context. @@ -1023,59 +974,6 @@ public class GridCacheCommandHandler extends GridRestCommandHandlerAdapter { } /** */ - private static class IncrementCommand extends CacheCommand { - /** */ - private static final long serialVersionUID = 0L; - - /** */ - private final Object key; - - /** */ - private final GridRestCacheRequest req; - - /** - * @param key Key. - * @param req Operation request. - */ - IncrementCommand(Object key, GridRestCacheRequest req) { - this.key = key; - this.req = req; - } - - /** {@inheritDoc} */ - @Override public IgniteFuture<?> applyx(CacheProjection<Object, Object> c, GridKernalContext ctx) - throws IgniteCheckedException { - return incrementOrDecrement(c, (String)key, req, false); - } - } - - /** */ - private static class DecrementCommand extends CacheCommand { - /** */ - private static final long serialVersionUID = 0L; - - /** */ - private final Object key; - - /** */ - private final GridRestCacheRequest req; - - /** - * @param key Key. - * @param req Operation request. - */ - DecrementCommand(Object key, GridRestCacheRequest req) { - this.key = key; - this.req = req; - } - - /** {@inheritDoc} */ - @Override public IgniteFuture<?> applyx(CacheProjection<Object, Object> c, GridKernalContext ctx) throws IgniteCheckedException { - return incrementOrDecrement(c, (String)key, req, true); - } - } - - /** */ private static class AppendCommand extends CacheProjectionCommand { /** */ private static final long serialVersionUID = 0L; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7e3c81b4/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/datastructures/DatastructuresCommandHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/datastructures/DatastructuresCommandHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/datastructures/DatastructuresCommandHandler.java new file mode 100644 index 0000000..b4cb672 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/datastructures/DatastructuresCommandHandler.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.rest.handlers.datastructures; + +import org.apache.ignite.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.rest.*; +import org.apache.ignite.internal.processors.rest.handlers.*; +import org.apache.ignite.internal.processors.rest.request.*; +import org.apache.ignite.internal.util.future.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.lang.*; + +import java.util.*; +import java.util.concurrent.*; + +import static org.apache.ignite.internal.processors.rest.GridRestCommand.*; + +/** + * + */ +public class DataStructuresCommandHandler extends GridRestCommandHandlerAdapter { + /** Supported commands. */ + private static final Collection<GridRestCommand> SUPPORTED_COMMANDS = U.sealList( + CACHE_INCREMENT, + CACHE_DECREMENT + ); + /** + * @param ctx Context. + */ + public DataStructuresCommandHandler(GridKernalContext ctx) { + super(ctx); + } + + /** {@inheritDoc} */ + @Override public Collection<GridRestCommand> supportedCommands() { + return SUPPORTED_COMMANDS; + } + + /** {@inheritDoc} */ + @Override public IgniteFuture<GridRestResponse> handleAsync(GridRestRequest req) { + assert SUPPORTED_COMMANDS.contains(req.command()) : req.command(); + + return incrementOrDecrement((DataStructuresRequest)req).chain(new CX1<IgniteFuture<?>, GridRestResponse>() { + @Override public GridRestResponse applyx(IgniteFuture<?> fut) throws IgniteCheckedException { + GridRestResponse res = new GridRestResponse(); + + res.setResponse(fut.get()); + + return res; + } + }); + } + /** + * Handles increment and decrement commands. + * + * @param req Request. + * @return Future of operation result. + */ + private IgniteFuture<?> incrementOrDecrement(final DataStructuresRequest req) { + assert req != null; + assert req.command() == CACHE_INCREMENT || req.command() == CACHE_DECREMENT : req.command(); + + if (req.key() == null) { + IgniteCheckedException err = + new IgniteCheckedException(GridRestCommandHandlerAdapter.missingParameter("key")); + + return new GridFinishedFuture(ctx, err); + } + else if (req.delta() == null) { + IgniteCheckedException err = + new IgniteCheckedException(GridRestCommandHandlerAdapter.missingParameter("delta")); + + return new GridFinishedFuture(ctx, err); + } + + return ctx.closure().callLocalSafe(new Callable<Object>() { + @Override public Object call() throws Exception { + Long init = req.initial(); + Long delta = req.delta(); + + boolean decr = req.command() == CACHE_DECREMENT; + + String key = (String)req.key(); + + IgniteAtomicLong l = ctx.grid().atomicLong(key, init != null ? init : 0, true); + + return l.addAndGet(decr ? -delta : delta); + } + }, false); + } +}