Repository: incubator-ignite Updated Branches: refs/heads/ignite-961-master [created] 952ac7246
#ignite-961-master: add rest commands. Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/952ac724 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/952ac724 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/952ac724 Branch: refs/heads/ignite-961-master Commit: 952ac7246add47ea315a8dafe7ae06d1d46c63ff Parents: f557728 Author: ivasilinets <ivasilin...@gridgain.com> Authored: Mon Jul 20 11:21:57 2015 +0300 Committer: ivasilinets <ivasilin...@gridgain.com> Committed: Mon Jul 20 11:21:57 2015 +0300 ---------------------------------------------------------------------- .../rest/AbstractRestProcessorSelfTest.java | 1 + .../JettyRestProcessorAbstractSelfTest.java | 458 +++++++++++++++++++ .../processors/cache/GridCacheAdapter.java | 2 +- .../processors/rest/GridRestCommand.java | 53 ++- .../processors/rest/GridRestProcessor.java | 14 + .../handlers/cache/GridCacheCommandHandler.java | 415 ++++++++++++++++- .../rest/handlers/query/CacheQueryResult.java | 101 ++++ .../handlers/query/QueryCommandHandler.java | 291 ++++++++++++ .../version/GridVersionCommandHandler.java | 14 +- .../rest/request/RestSqlQueryRequest.java | 125 +++++ .../http/jetty/GridJettyRestHandler.java | 68 ++- 11 files changed, 1509 insertions(+), 33 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/952ac724/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/AbstractRestProcessorSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/AbstractRestProcessorSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/AbstractRestProcessorSelfTest.java index 4f1969f..8310b0f 100644 --- a/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/AbstractRestProcessorSelfTest.java +++ b/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/AbstractRestProcessorSelfTest.java @@ -84,6 +84,7 @@ abstract class AbstractRestProcessorSelfTest extends GridCommonAbstractTest { CacheConfiguration ccfg = defaultCacheConfiguration(); ccfg.setStatisticsEnabled(true); + ccfg.setIndexedTypes(String.class, String.class); cfg.setCacheConfiguration(ccfg); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/952ac724/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAbstractSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAbstractSelfTest.java index d5a3cc1..c6c8195 100644 --- a/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAbstractSelfTest.java +++ b/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAbstractSelfTest.java @@ -17,12 +17,20 @@ package org.apache.ignite.internal.processors.rest; +import net.sf.json.*; +import org.apache.ignite.*; import org.apache.ignite.cache.*; +import org.apache.ignite.cache.query.*; +import org.apache.ignite.cache.query.annotations.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.processors.rest.handlers.*; import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.testframework.*; import java.io.*; import java.net.*; import java.util.*; +import java.util.concurrent.*; import java.util.regex.*; import static org.apache.ignite.IgniteSystemProperties.*; @@ -40,6 +48,8 @@ abstract class JettyRestProcessorAbstractSelfTest extends AbstractRestProcessorS System.setProperty(IGNITE_JETTY_PORT, Integer.toString(restPort())); super.beforeTestsStarted(); + + initCache(); } /** {@inheritDoc} */ @@ -50,6 +60,11 @@ abstract class JettyRestProcessorAbstractSelfTest extends AbstractRestProcessorS } /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + grid(0).cache(null).clear(); + } + + /** {@inheritDoc} */ @Override protected int gridCount() { return GRID_CNT; } @@ -115,6 +130,18 @@ abstract class JettyRestProcessorAbstractSelfTest extends AbstractRestProcessorS } /** + * @param err Error. + * @return Regex pattern for JSON. + */ + private String errorPattern(String err) { + return "\\{" + + "\\\"error\\\":\\\"" + err + "\\\"\\," + + "\\\"response\\\":null\\," + + "\\\"sessionToken\\\":\\\"\\\"," + + "\\\"successStatus\\\":" + 1 + "\\}"; + } + + /** * @param res Response. * @param success Success flag. * @return Regex pattern for JSON. @@ -144,6 +171,19 @@ abstract class JettyRestProcessorAbstractSelfTest extends AbstractRestProcessorS * @param success Success flag. * @return Regex pattern for JSON. */ + private String cacheBulkPattern(int res, boolean success) { + return "\\{\\\"affinityNodeId\\\":\\\"\\\"\\," + + "\\\"error\\\":\\\"\\\"\\," + + "\\\"response\\\":" + res + "\\," + + "\\\"sessionToken\\\":\\\"\\\"," + + "\\\"successStatus\\\":" + (success ? 0 : 1) + "\\}"; + } + + /** + * @param res Response. + * @param success Success flag. + * @return Regex pattern for JSON. + */ private String cachePattern(boolean res, boolean success) { return "\\{\\\"affinityNodeId\\\":\\\"\\w{8}-\\w{4}-\\w{4}-\\w{4}-\\w{12}\\\"\\," + "\\\"error\\\":\\\"\\\"\\," + @@ -221,6 +261,59 @@ abstract class JettyRestProcessorAbstractSelfTest extends AbstractRestProcessorS /** * @throws Exception If failed. */ + public void testCacheSize() throws Exception { + jcache().removeAll(); + + jcache().put("getKey", "getVal"); + + String ret = content(F.asMap("cmd", "cachesize")); + + assertNotNull(ret); + assertTrue(!ret.isEmpty()); + + info("Size command result: " + ret); + + jsonEquals(ret, cacheBulkPattern(1, true)); + } + + /** + * @throws Exception If failed. + */ + public void testIgniteName() throws Exception { + String ret = content(F.asMap("cmd", "name")); + + assertNotNull(ret); + assertTrue(!ret.isEmpty()); + + info("Name command result: " + ret); + + jsonEquals(ret, stringPattern(getTestGridName(0), true)); + } + + /** + * @throws Exception If failed. + */ + public void testGetOrCreateCache() throws Exception { + String ret = content(F.asMap("cmd", "getorcreatecache", "cacheName", "testCache")); + + assertNotNull(ret); + assertTrue(!ret.isEmpty()); + + info("Name command result: " + ret); + + grid(0).cache("testCache").put("1", "1"); + + ret = content(F.asMap("cmd", "destroycache", "cacheName", "testCache")); + + assertNotNull(ret); + assertTrue(!ret.isEmpty()); + + assertNull(grid(0).cache("testCache")); + } + + /** + * @throws Exception If failed. + */ public void testGetAll() throws Exception { jcache().put("getKey1", "getVal1"); jcache().put("getKey2", "getVal2"); @@ -241,6 +334,174 @@ abstract class JettyRestProcessorAbstractSelfTest extends AbstractRestProcessorS /** * @throws Exception If failed. */ + public void testIncorrectPut() throws Exception { + String ret = content(F.asMap("cmd", "put", "key", "key0")); + + assertNotNull(ret); + assertTrue(!ret.isEmpty()); + jsonEquals(ret, errorPattern("Failed to find mandatory parameter in request: val")); + } + + /** + * @throws Exception If failed. + */ + public void testContainsKey() throws Exception { + grid(0).cache(null).put("key0", "val0"); + + String ret = content(F.asMap("cmd", "containskey", "key", "key0")); + + assertNotNull(ret); + assertTrue(!ret.isEmpty()); + + jsonEquals(ret, cachePattern(true, true)); + } + + /** + * @throws Exception If failed. + */ + public void testContainesKeys() throws Exception { + grid(0).cache(null).put("key0", "val0"); + grid(0).cache(null).put("key1", "val1"); + + String ret = content(F.asMap("cmd", "containskeys", "k1", "key0", "k2", "key1")); + + assertNotNull(ret); + assertTrue(!ret.isEmpty()); + + jsonEquals(ret, cacheBulkPattern(true, true)); + } + + /** + * @throws Exception If failed. + */ + public void testGetAndPut() throws Exception { + grid(0).cache(null).put("key0", "val0"); + + String ret = content(F.asMap("cmd", "getandput", "key", "key0", "val", "val1")); + + assertNotNull(ret); + assertTrue(!ret.isEmpty()); + + jsonEquals(ret, cachePattern("val0", true)); + + assertEquals("val1", grid(0).cache(null).get("key0")); + } + + /** + * @throws Exception If failed. + */ + public void testGetAndPutIfAbsent() throws Exception { + grid(0).cache(null).put("key0", "val0"); + + String ret = content(F.asMap("cmd", "getandputifabsent", "key", "key0", "val", "val1")); + + assertNotNull(ret); + assertTrue(!ret.isEmpty()); + + jsonEquals(ret, cachePattern("val0", true)); + + assertEquals("val0", grid(0).cache(null).get("key0")); + } + + /** + * @throws Exception If failed. + */ + public void testPutIfAbsent2() throws Exception { + String ret = content(F.asMap("cmd", "putifabsent", "key", "key0", "val", "val1")); + + assertNotNull(ret); + assertTrue(!ret.isEmpty()); + + jsonEquals(ret, cachePattern(true, true)); + + assertEquals("val1", grid(0).cache(null).get("key0")); + } + + /** + * @throws Exception If failed. + */ + public void testRemoveValue() throws Exception { + grid(0).cache(null).put("key0", "val0"); + + String ret = content(F.asMap("cmd", "rmvvalue", "key", "key0", "val", "val1")); + + assertNotNull(ret); + assertTrue(!ret.isEmpty()); + + jsonEquals(ret, cachePattern(false, true)); + + assertEquals("val0", grid(0).cache(null).get("key0")); + + ret = content(F.asMap("cmd", "rmvvalue", "key", "key0", "val", "val0")); + + assertNotNull(ret); + assertTrue(!ret.isEmpty()); + + jsonEquals(ret, cachePattern(true, true)); + + assertNull(grid(0).cache(null).get("key0")); + } + + /** + * @throws Exception If failed. + */ + public void testGetAndRemove() throws Exception { + grid(0).cache(null).put("key0", "val0"); + + String ret = content(F.asMap("cmd", "getandrmv", "key", "key0")); + + assertNotNull(ret); + assertTrue(!ret.isEmpty()); + + jsonEquals(ret, cachePattern("val0", true)); + + assertNull(grid(0).cache(null).get("key0")); + } + + /** + * @throws Exception If failed. + */ + public void testReplaceValue() throws Exception { + grid(0).cache(null).put("key0", "val0"); + + String ret = content(F.asMap("cmd", "repval", "key", "key0", "val", "val1", "val2", "val2")); + + assertNotNull(ret); + assertTrue(!ret.isEmpty()); + + jsonEquals(ret, cachePattern(false, true)); + + assertEquals("val0", grid(0).cache(null).get("key0")); + + ret = content(F.asMap("cmd", "repval", "key", "key0", "val", "val1", "val2", "val0")); + + assertNotNull(ret); + assertTrue(!ret.isEmpty()); + + jsonEquals(ret, cachePattern(true, true)); + + assertEquals("val1", grid(0).cache(null).get("key0")); + } + + /** + * @throws Exception If failed. + */ + public void testGetAndReplace() throws Exception { + grid(0).cache(null).put("key0", "val0"); + + String ret = content(F.asMap("cmd", "getandreplace", "key", "key0", "val", "val1")); + + assertNotNull(ret); + assertTrue(!ret.isEmpty()); + + jsonEquals(ret, cachePattern("val0", true)); + + assertEquals("val1", grid(0).cache(null).get("key0")); + } + + /** + * @throws Exception If failed. + */ public void testPut() throws Exception { String ret = content(F.asMap("cmd", "put", "key", "putKey", "val", "putVal")); @@ -722,5 +983,202 @@ abstract class JettyRestProcessorAbstractSelfTest extends AbstractRestProcessorS jsonEquals(ret, stringPattern(".+", true)); } + /** + * @throws Exception If failed. + */ + public void testQueryArgs() throws Exception { + String qry = "salary > ? and salary <= ?"; + + Map<String, String> params = new HashMap<>(); + params.put("cmd", "qryexecute"); + params.put("type", "Person"); + params.put("psz", "10"); + params.put("cacheName", "person"); + params.put("qry", URLEncoder.encode(qry)); + params.put("arg1", "1000"); + params.put("arg2", "2000"); + + String ret = content(params); + + assertNotNull(ret); + assertTrue(!ret.isEmpty()); + + JSONObject json = JSONObject.fromObject(ret); + + List items = (List)((Map)json.get("response")).get("items"); + + assertEquals(2, items.size()); + + for (int i = 0; i < GRID_CNT; ++i) { + Map<GridRestCommand, GridRestCommandHandler> handlers = + GridTestUtils.getFieldValue(grid(i).context().rest(), "handlers"); + + GridRestCommandHandler qryHnd = handlers.get(GridRestCommand.CLOSE_SQL_QUERY); + + ConcurrentHashMap<Long, Iterator> its = GridTestUtils.getFieldValue(qryHnd, "curs"); + + assertEquals(0, its.size()); + } + } + + /** + * @throws Exception If failed. + */ + public void testQueryClose() throws Exception { + String qry = "salary > ? and salary <= ?"; + + Map<String, String> params = new HashMap<>(); + params.put("cmd", "qryexecute"); + params.put("type", "Person"); + params.put("psz", "1"); + params.put("cacheName", "person"); + params.put("qry", URLEncoder.encode(qry)); + params.put("arg1", "1000"); + params.put("arg2", "2000"); + + String ret = content(params); + + assertNotNull(ret); + assertTrue(!ret.isEmpty()); + + JSONObject json = JSONObject.fromObject(ret); + + List items = (List)((Map)json.get("response")).get("items"); + + assertEquals(1, items.size()); + + boolean found = false; + + for (int i = 0; i < GRID_CNT; ++i) { + Map<GridRestCommand, GridRestCommandHandler> handlers = + GridTestUtils.getFieldValue(grid(i).context().rest(), "handlers"); + + GridRestCommandHandler qryHnd = handlers.get(GridRestCommand.CLOSE_SQL_QUERY); + + ConcurrentHashMap<Long, Iterator> its = GridTestUtils.getFieldValue(qryHnd, "curs"); + + found |= its.size() != 0; + } + + assertTrue(found); + + Integer qryId = (Integer)((Map)json.get("response")).get("queryId"); + + assertNotNull(qryId); + + ret = content(F.asMap("cmd", "qryclose", "cacheName", "person", "qryId", String.valueOf(qryId))); + + assertNotNull(ret); + assertTrue(!ret.isEmpty()); + + found = false; + + for (int i = 0; i < GRID_CNT; ++i) { + Map<GridRestCommand, GridRestCommandHandler> handlers = + GridTestUtils.getFieldValue(grid(i).context().rest(), "handlers"); + + GridRestCommandHandler qryHnd = handlers.get(GridRestCommand.CLOSE_SQL_QUERY); + + ConcurrentHashMap<Long, Iterator> its = GridTestUtils.getFieldValue(qryHnd, "curs"); + + found |= its.size() != 0; + } + + assertFalse(found); + } + protected abstract String signature() throws Exception; + + /** + * Init cache. + */ + private void initCache() { + CacheConfiguration<Integer, Person> personCacheCfg = new CacheConfiguration<>("person"); + personCacheCfg.setIndexedTypes(Integer.class, Person.class); + + IgniteCache<Integer, Person> personCache = grid(0).getOrCreateCache(personCacheCfg); + + personCache.clear(); + + Person p1 = new Person("John", "Doe", 2000); + Person p2 = new Person("Jane", "Doe", 1000); + Person p3 = new Person("John", "Smith", 1000); + Person p4 = new Person("Jane", "Smith", 2000); + + personCache.put(p1.getId(), p1); + personCache.put(p2.getId(), p2); + personCache.put(p3.getId(), p3); + personCache.put(p4.getId(), p4); + + SqlQuery<Integer, Person> qry = new SqlQuery<>(Person.class, "salary > ? and salary <= ?"); + + qry.setArgs(1000, 2000); + + assertEquals(2, personCache.query(qry).getAll().size()); + } + + /** + * Person class. + */ + public static class Person implements Serializable { + /** Person id. */ + private static int PERSON_ID = 0; + + /** Person ID (indexed). */ + @QuerySqlField(index = true) + private Integer id; + + /** First name (not-indexed). */ + @QuerySqlField + private String firstName; + + /** Last name (not indexed). */ + @QuerySqlField + private String lastName; + + /** Salary (indexed). */ + @QuerySqlField(index = true) + private double salary; + + /** + * @param firstName First name. + * @param lastName Last name. + * @param salary Salary. + */ + Person(String firstName, String lastName, double salary) { + id = PERSON_ID++; + + this.firstName = firstName; + this.lastName = lastName; + this.salary = salary; + } + + /** + * @return First name. + */ + public String getFirstName() { + return firstName; + } + + /** + * @return Last name. + */ + public String getLastName() { + return lastName; + } + /** + * @return Salary. + */ + public double getSalary() { + + return salary; + } + + /** + * @return Id. + */ + public Integer getId() { + return id; + } + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/952ac724/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index d2a730a..cacadc7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -2284,7 +2284,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V * @return Put future. */ public IgniteInternalFuture<Boolean> putAsync(K key, V val, - @Nullable CacheEntryPredicate... filter) { + @Nullable CacheEntryPredicate... filter) { final boolean statsEnabled = ctx.config().isStatisticsEnabled(); final long start = statsEnabled ? System.nanoTime() : 0L; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/952ac724/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/GridRestCommand.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/GridRestCommand.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/GridRestCommand.java index 62732f0..ab8929f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/GridRestCommand.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/GridRestCommand.java @@ -33,12 +33,27 @@ public enum GridRestCommand { /** Get cached value. */ CACHE_GET("get"), + /** Contains cached value. */ + CACHE_CONTAINS_KEY("containskey"), + + /** Contains cached values. */ + CACHE_CONTAINS_KEYS("containskeys"), + /** Get several cached values. */ CACHE_GET_ALL("getall"), + /** Store value in cache and return previous value. */ + CACHE_GET_AND_PUT("getandput"), + + /** Store value in cache and return previous value. */ + CACHE_GET_AND_PUT_IF_ABSENT("getandputifabsent"), + /** Store value in cache. */ CACHE_PUT("put"), + /** Store value in cache. */ + CACHE_PUT_IF_ABSENT("putifabsent"), + /** Store value in cache if it doesn't exist. */ CACHE_ADD("add"), @@ -48,12 +63,24 @@ public enum GridRestCommand { /** Remove value from cache. */ CACHE_REMOVE("rmv"), + /** Remove value from cache. */ + CACHE_REMOVE_VALUE("rmvvalue"), + + /** Remove value from cache. */ + CACHE_GET_AND_REMOVE("getandrmv"), + /** Remove several values from cache. */ CACHE_REMOVE_ALL("rmvall"), /** Replace cache value only if there is currently a mapping for it. */ CACHE_REPLACE("rep"), + /** Replace cache value only if there is currently a mapping for it. */ + CACHE_REPLACE_VALUE("repval"), + + /** Replace cache value only if there is currently a mapping for it. */ + CACHE_GET_AND_REPLACE("getandreplace"), + /** Compare and set. */ CACHE_CAS("cas"), @@ -66,6 +93,9 @@ public enum GridRestCommand { /** Cache metrics. */ CACHE_METRICS("cache"), + /** Cache size. */ + CACHE_SIZE("cachesize"), + /** Increment. */ ATOMIC_INCREMENT("incr"), @@ -87,6 +117,9 @@ public enum GridRestCommand { /** Version. */ VERSION("version"), + /** Name. */ + NAME("name"), + /** Log. */ LOG("log"), @@ -94,7 +127,25 @@ public enum GridRestCommand { NOOP("noop"), /** Quit. */ - QUIT("quit"); + QUIT("quit"), + + /** Get or create cache. */ + GET_OR_CREATE_CACHE("getorcreatecache"), + + /** Stops dynamically started cache. */ + DESTROY_CACHE("destroycache"), + + /** Execute sql query. */ + EXECUTE_SQL_QUERY("qryexecute"), + + /** Execute sql fields query. */ + EXECUTE_SQL_FIELDS_QUERY("qryfieldsexecute"), + + /** Fetch query results. */ + FETCH_SQL_QUERY("qryfetch"), + + /** Close query. */ + CLOSE_SQL_QUERY("qryclose"); /** Enum values. */ private static final GridRestCommand[] VALS = values(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/952ac724/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/GridRestProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/GridRestProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/GridRestProcessor.java index 2d1d802..d454fd7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/GridRestProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/GridRestProcessor.java @@ -25,6 +25,7 @@ import org.apache.ignite.internal.processors.rest.client.message.*; import org.apache.ignite.internal.processors.rest.handlers.*; import org.apache.ignite.internal.processors.rest.handlers.cache.*; import org.apache.ignite.internal.processors.rest.handlers.datastructures.*; +import org.apache.ignite.internal.processors.rest.handlers.query.*; import org.apache.ignite.internal.processors.rest.handlers.task.*; import org.apache.ignite.internal.processors.rest.handlers.top.*; import org.apache.ignite.internal.processors.rest.handlers.version.*; @@ -254,6 +255,7 @@ public class GridRestProcessor extends GridProcessorAdapter { addHandler(new GridTopologyCommandHandler(ctx)); addHandler(new GridVersionCommandHandler(ctx)); addHandler(new DataStructuresCommandHandler(ctx)); + addHandler(new QueryCommandHandler(ctx)); // Start protocols. startTcpProtocol(); @@ -384,6 +386,8 @@ public class GridRestProcessor extends GridProcessorAdapter { if (interceptor != null && res.getResponse() != null) { switch (req.command()) { + case CACHE_CONTAINS_KEYS: + case CACHE_CONTAINS_KEY: case CACHE_GET: case CACHE_GET_ALL: case CACHE_PUT: @@ -527,6 +531,8 @@ public class GridRestProcessor extends GridProcessorAdapter { switch (req.command()) { case CACHE_GET: + case CACHE_CONTAINS_KEY: + case CACHE_CONTAINS_KEYS: case CACHE_GET_ALL: perm = SecurityPermission.CACHE_READ; name = ((GridRestCacheRequest)req).cacheName(); @@ -540,6 +546,11 @@ public class GridRestProcessor extends GridProcessorAdapter { case CACHE_CAS: case CACHE_APPEND: case CACHE_PREPEND: + case CACHE_GET_AND_PUT: + case CACHE_GET_AND_REPLACE: + case CACHE_GET_AND_PUT_IF_ABSENT: + case CACHE_PUT_IF_ABSENT: + case CACHE_REPLACE_VALUE: perm = SecurityPermission.CACHE_PUT; name = ((GridRestCacheRequest)req).cacheName(); @@ -547,6 +558,8 @@ public class GridRestProcessor extends GridProcessorAdapter { case CACHE_REMOVE: case CACHE_REMOVE_ALL: + case CACHE_GET_AND_REMOVE: + case CACHE_REMOVE_VALUE: perm = SecurityPermission.CACHE_REMOVE; name = ((GridRestCacheRequest)req).cacheName(); @@ -560,6 +573,7 @@ public class GridRestProcessor extends GridProcessorAdapter { break; case CACHE_METRICS: + case CACHE_SIZE: case TOPOLOGY: case NODE: case VERSION: http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/952ac724/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheCommandHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheCommandHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheCommandHandler.java index 1f24023..c20360a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheCommandHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheCommandHandler.java @@ -53,26 +53,46 @@ import static org.apache.ignite.transactions.TransactionIsolation.*; public class GridCacheCommandHandler extends GridRestCommandHandlerAdapter { /** Supported commands. */ private static final Collection<GridRestCommand> SUPPORTED_COMMANDS = U.sealList( + DESTROY_CACHE, + GET_OR_CREATE_CACHE, + CACHE_CONTAINS_KEYS, + CACHE_CONTAINS_KEY, CACHE_GET, + CACHE_GET_AND_PUT, + CACHE_GET_AND_REPLACE, + CACHE_GET_AND_PUT_IF_ABSENT, + CACHE_PUT_IF_ABSENT, CACHE_GET_ALL, CACHE_PUT, CACHE_ADD, CACHE_PUT_ALL, CACHE_REMOVE, + CACHE_REMOVE_VALUE, + CACHE_REPLACE_VALUE, + CACHE_GET_AND_REMOVE, CACHE_REMOVE_ALL, CACHE_REPLACE, CACHE_CAS, CACHE_APPEND, CACHE_PREPEND, - CACHE_METRICS + CACHE_METRICS, + CACHE_SIZE ); /** Requests with required parameter {@code key}. */ private static final EnumSet<GridRestCommand> KEY_REQUIRED_REQUESTS = EnumSet.of( + CACHE_CONTAINS_KEY, CACHE_GET, + CACHE_GET_AND_PUT, + CACHE_GET_AND_REPLACE, + CACHE_GET_AND_PUT_IF_ABSENT, + CACHE_PUT_IF_ABSENT, CACHE_PUT, CACHE_ADD, CACHE_REMOVE, + CACHE_REMOVE_VALUE, + CACHE_REPLACE_VALUE, + CACHE_GET_AND_REMOVE, CACHE_REPLACE, ATOMIC_INCREMENT, ATOMIC_DECREMENT, @@ -137,6 +157,32 @@ public class GridCacheCommandHandler extends GridRestCommandHandlerAdapter { IgniteInternalFuture<GridRestResponse> fut; switch (cmd) { + case DESTROY_CACHE: { + fut = ctx.closure().callLocalSafe(new DestroyCacheCommand(ctx, cacheName)); + + break; + } + + case GET_OR_CREATE_CACHE: { + fut = ctx.closure().callLocalSafe(new GetOrCreateCacheCallable(ctx, cacheName)); + + break; + } + + case CACHE_CONTAINS_KEYS: { + fut = executeCommand(req.destinationId(), req.clientId(), cacheName, skipStore, key, + new ContainsKeysCommand(getKeys(req0))); + + break; + } + + case CACHE_CONTAINS_KEY: { + fut = executeCommand(req.destinationId(), req.clientId(), cacheName, skipStore, key, + new ContainsKeyCommand(key)); + + break; + } + case CACHE_GET: { fut = executeCommand(req.destinationId(), req.clientId(), cacheName, skipStore, key, new GetCommand(key)); @@ -144,48 +190,51 @@ public class GridCacheCommandHandler extends GridRestCommandHandlerAdapter { break; } - case CACHE_GET_ALL: { - Set<Object> keys = req0.values().keySet(); + case CACHE_GET_AND_PUT: { + fut = executeCommand(req.destinationId(), req.clientId(), cacheName, skipStore, key, + new GetAndPutCommand(key, getValue(req0))); - if (F.isEmpty(keys)) - throw new IgniteCheckedException(GridRestCommandHandlerAdapter.missingParameter("keys")); + break; + } - // HashSet wrapping for correct serialization - HashSet<Object> keys0 = new HashSet<>(); + case CACHE_GET_AND_REPLACE: { + fut = executeCommand(req.destinationId(), req.clientId(), cacheName, skipStore, key, + new GetAndReplaceCommand(key, getValue(req0))); - for (Object getKey : keys) { - if (getKey == null) - throw new IgniteCheckedException("Failing getAll operation (null keys are not allowed)."); + break; + } - keys0.add(getKey); - } + case CACHE_GET_AND_PUT_IF_ABSENT: { + fut = executeCommand(req.destinationId(), req.clientId(), cacheName, skipStore, key, + new GetAndPutIfAbsentCommand(key, getValue(req0))); + + break; + } + case CACHE_PUT_IF_ABSENT: { fut = executeCommand(req.destinationId(), req.clientId(), cacheName, skipStore, key, - new GetAllCommand(keys0)); + new PutIfAbsentCommand(key, getValue(req0))); break; } - case CACHE_PUT: { - final Object val = req0.value(); + case CACHE_GET_ALL: { + fut = executeCommand(req.destinationId(), req.clientId(), cacheName, skipStore, key, + new GetAllCommand(getKeys(req0))); - if (val == null) - throw new IgniteCheckedException(GridRestCommandHandlerAdapter.missingParameter("val")); + break; + } + case CACHE_PUT: { fut = executeCommand(req.destinationId(), req.clientId(), cacheName, skipStore, key, new - PutCommand(key, ttl, val)); + PutCommand(key, ttl, getValue(req0))); break; } case CACHE_ADD: { - final Object val = req0.value(); - - if (val == null) - throw new IgniteCheckedException(GridRestCommandHandlerAdapter.missingParameter("val")); - fut = executeCommand(req.destinationId(), req.clientId(), cacheName, skipStore, key, - new AddCommand(key, ttl, val)); + new AddCommand(key, ttl, getValue(req0))); break; } @@ -220,6 +269,27 @@ public class GridCacheCommandHandler extends GridRestCommandHandlerAdapter { break; } + case CACHE_REMOVE_VALUE: { + fut = executeCommand(req.destinationId(), req.clientId(), cacheName, skipStore, key, + new RemoveValueCommand(key, getValue(req0))); + + break; + } + + case CACHE_REPLACE_VALUE: { + fut = executeCommand(req.destinationId(), req.clientId(), cacheName, skipStore, key, + new ReplaceValueCommand(key, getValue(req0), req0.value2())); + + break; + } + + case CACHE_GET_AND_REMOVE: { + fut = executeCommand(req.destinationId(), req.clientId(), cacheName, skipStore, key, + new GetAndRemoveCommand(key)); + + break; + } + case CACHE_REMOVE_ALL: { Map<Object, Object> map = req0.values(); @@ -274,6 +344,12 @@ public class GridCacheCommandHandler extends GridRestCommandHandlerAdapter { break; } + case CACHE_SIZE: { + fut = executeCommand(req.destinationId(), req.clientId(), cacheName, key, new SizeCommand()); + + break; + } + default: throw new IllegalArgumentException("Invalid command for cache handler: " + req); } @@ -297,6 +373,44 @@ public class GridCacheCommandHandler extends GridRestCommandHandlerAdapter { } /** + * @param req Request. + * @return Request keys. + * @throws IgniteCheckedException If incorrect keys are presented. + */ + private Set<Object> getKeys(GridRestCacheRequest req) throws IgniteCheckedException { + Set<Object> keys = req.values().keySet(); + + if (F.isEmpty(keys)) + throw new IgniteCheckedException(GridRestCommandHandlerAdapter.missingParameter("keys")); + + // HashSet wrapping for correct serialization + HashSet<Object> keys0 = new HashSet<>(); + + for (Object getKey : keys) { + if (getKey == null) + throw new IgniteCheckedException("Failing operation (null keys are not allowed)."); + + keys0.add(getKey); + } + + return keys0; + } + + /** + * @param req Request. + * @return Request value. + * @throws IgniteCheckedException If incorrect keys are presented. + */ + private Object getValue(GridRestCacheRequest req) throws IgniteCheckedException { + final Object val = req.value(); + + if (val == null) + throw new IgniteCheckedException(GridRestCommandHandlerAdapter.missingParameter("val")); + + return val; + } + + /** * Executes command on flagged cache projection. Checks {@code destId} to find * if command could be performed locally or routed to a remote node. * @@ -702,6 +816,48 @@ public class GridCacheCommandHandler extends GridRestCommandHandlerAdapter { } /** */ + private static class ContainsKeyCommand extends CacheProjectionCommand { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private final Object key; + + /** + * @param key Key. + */ + ContainsKeyCommand(Object key) { + this.key = key; + } + + /** {@inheritDoc} */ + @Override public IgniteInternalFuture<?> applyx(IgniteInternalCache<Object, Object> c, GridKernalContext ctx) { + return c.containsKeyAsync(key); + } + } + + /** */ + private static class ContainsKeysCommand extends CacheProjectionCommand { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private final Collection<Object> keys; + + /** + * @param keys Keys. + */ + ContainsKeysCommand(Collection<Object> keys) { + this.keys = keys; + } + + /** {@inheritDoc} */ + @Override public IgniteInternalFuture<?> applyx(IgniteInternalCache<Object, Object> c, GridKernalContext ctx) { + return c.containsKeysAsync(keys); + } + } + + /** */ private static class GetCommand extends CacheProjectionCommand { /** */ private static final long serialVersionUID = 0L; @@ -723,6 +879,113 @@ public class GridCacheCommandHandler extends GridRestCommandHandlerAdapter { } /** */ + private static class GetAndPutCommand extends CacheProjectionCommand { + /** */ + private static final long serialVersionUID = 0L; + + /** Key. */ + protected final Object key; + + /** Value.*/ + protected final Object val; + + /** + * @param key Key. + * @param val Value. + */ + GetAndPutCommand(Object key, Object val) { + this.key = key; + this.val = val; + } + + /** {@inheritDoc} */ + @Override public IgniteInternalFuture<?> applyx(IgniteInternalCache<Object, Object> c, GridKernalContext ctx) { + return c.getAndPutAsync(key, val); + } + } + + /** */ + private static class GetAndReplaceCommand extends GetAndPutCommand { + /** */ + private static final long serialVersionUID = 0L; + + /** + * @param key Key. + * @param val Value. + */ + GetAndReplaceCommand(Object key, Object val) { + super(key, val); + } + + /** {@inheritDoc} */ + @Override public IgniteInternalFuture<?> applyx(IgniteInternalCache<Object, Object> c, GridKernalContext ctx) { + return c.getAndReplaceAsync(key, val); + } + } + + /** */ + private static class ReplaceValueCommand extends GetAndReplaceCommand { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private final Object oldVal; + + /** + * @param key Key. + * @param val Value. + * @param oldVal Old value. + */ + ReplaceValueCommand(Object key, Object val, Object oldVal) { + super(key, val); + this.oldVal = oldVal; + } + + /** {@inheritDoc} */ + @Override public IgniteInternalFuture<?> applyx(IgniteInternalCache<Object, Object> c, GridKernalContext ctx) { + return c.replaceAsync(key, oldVal, val); + } + } + + /** */ + private static class GetAndPutIfAbsentCommand extends GetAndPutCommand { + /** */ + private static final long serialVersionUID = 0L; + + /** + * @param key Key. + * @param val Value. + */ + GetAndPutIfAbsentCommand(Object key, Object val) { + super(key, val); + } + + /** {@inheritDoc} */ + @Override public IgniteInternalFuture<?> applyx(IgniteInternalCache<Object, Object> c, GridKernalContext ctx) { + return c.getAndPutIfAbsentAsync(key, val); + } + } + + /** */ + private static class PutIfAbsentCommand extends GetAndPutCommand { + /** */ + private static final long serialVersionUID = 0L; + + /** + * @param key Key. + * @param val Value. + */ + PutIfAbsentCommand(Object key, Object val) { + super(key, val); + } + + /** {@inheritDoc} */ + @Override public IgniteInternalFuture<?> applyx(IgniteInternalCache<Object, Object> c, GridKernalContext ctx) { + return c.putIfAbsentAsync(key, val); + } + } + + /** */ private static class GetAllCommand extends CacheProjectionCommand { /** */ private static final long serialVersionUID = 0L; @@ -770,7 +1033,7 @@ public class GridCacheCommandHandler extends GridRestCommandHandlerAdapter { private static final long serialVersionUID = 0L; /** */ - private final Object key; + protected final Object key; /** * @param key Key. @@ -786,6 +1049,43 @@ public class GridCacheCommandHandler extends GridRestCommandHandlerAdapter { } /** */ + private static class RemoveValueCommand extends GetAndPutCommand { + /** */ + private static final long serialVersionUID = 0L; + + /** + * @param key Key. + * @param val Value. + */ + RemoveValueCommand(Object key, Object val) { + super(key, val); + } + + /** {@inheritDoc} */ + @Override public IgniteInternalFuture<?> applyx(IgniteInternalCache<Object, Object> c, GridKernalContext ctx) { + return c.removeAsync(key, val); + } + } + + /** */ + private static class GetAndRemoveCommand extends RemoveCommand { + /** */ + private static final long serialVersionUID = 0L; + + /** + * @param key Key. + */ + GetAndRemoveCommand(Object key) { + super(key); + } + + /** {@inheritDoc} */ + @Override public IgniteInternalFuture<?> applyx(IgniteInternalCache<Object, Object> c, GridKernalContext ctx) { + return c.getAndRemoveAsync(key); + } + } + + /** */ private static class RemoveAllCommand extends CacheProjectionCommand { /** */ private static final long serialVersionUID = 0L; @@ -1025,4 +1325,69 @@ public class GridCacheCommandHandler extends GridRestCommandHandlerAdapter { ); } } + + /** */ + private static class SizeCommand extends CacheCommand { + /** */ + private static final long serialVersionUID = 0L; + + /** {@inheritDoc} */ + @Override public IgniteInternalFuture<?> applyx(IgniteInternalCache<Object, Object> c, GridKernalContext ctx) { + return c.sizeAsync(new CachePeekMode[]{CachePeekMode.PRIMARY}); + } + } + + /** + * Destroy cache callable. + */ + private static class DestroyCacheCommand extends GetOrCreateCacheCallable { + + public DestroyCacheCommand(GridKernalContext ctx, String cacheName) { + super(ctx, cacheName); + } + + /** {@inheritDoc} */ + @Override public GridRestResponse call() throws Exception { + try { + ctx.grid().destroyCache(cacheName); + + return new GridRestResponse(); + } + catch (Exception e) { + return new GridRestResponse(GridRestResponse.STATUS_FAILED, e.getMessage()); + } + } + } + + /** + * Get or create cache callable. + */ + private static class GetOrCreateCacheCallable implements Callable<GridRestResponse> { + /** Kernal context. */ + protected GridKernalContext ctx; + + /** Cache name. */ + protected String cacheName; + + /** + * @param ctx Kernal context. + * @param cacheName Cache name. + */ + public GetOrCreateCacheCallable(GridKernalContext ctx, String cacheName) { + this.ctx = ctx; + this.cacheName = cacheName; + } + + /** {@inheritDoc} */ + @Override public GridRestResponse call() throws Exception { + try { + ctx.grid().getOrCreateCache(cacheName); + + return new GridRestResponse(); + } + catch (Exception e) { + return new GridRestResponse(GridRestResponse.STATUS_FAILED, e.getMessage()); + } + } + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/952ac724/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/query/CacheQueryResult.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/query/CacheQueryResult.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/query/CacheQueryResult.java new file mode 100644 index 0000000..3e49576 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/query/CacheQueryResult.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.rest.handlers.query; + +import org.apache.ignite.internal.util.typedef.internal.*; + +import java.io.*; +import java.util.*; + +/** + * Client query result. + */ +public class CacheQueryResult implements Externalizable { + /** */ + private static final long serialVersionUID = 0L; + + /** Query ID. */ + private long qryId; + + /** Result items. */ + private Collection<?> items; + + /** Last flag. */ + private boolean last; + + /** + * @return Query ID. + */ + public long getQueryId() { + return qryId; + } + + /** + * @param qryId Query ID. + */ + public void setQueryId(long qryId) { + this.qryId = qryId; + } + + /** + * @return Items. + */ + public Collection<?> getItems() { + return items; + } + + /** + * @param items Items. + */ + public void setItems(Collection<?> items) { + this.items = items; + } + + /** + * @return Last flag. + */ + public boolean getLast() { + return last; + } + + /** + * @param last Last flag. + */ + public void setLast(boolean last) { + this.last = last; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(CacheQueryResult.class, this); + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + out.writeBoolean(last); + out.writeLong(qryId); + U.writeCollection(out, items); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + last = in.readBoolean(); + qryId = in.readLong(); + items = U.readCollection(in); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/952ac724/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/query/QueryCommandHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/query/QueryCommandHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/query/QueryCommandHandler.java new file mode 100644 index 0000000..097c7aa --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/query/QueryCommandHandler.java @@ -0,0 +1,291 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.rest.handlers.query; + +import org.apache.ignite.*; +import org.apache.ignite.cache.query.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.rest.*; +import org.apache.ignite.internal.processors.rest.handlers.*; +import org.apache.ignite.internal.processors.rest.request.*; +import org.apache.ignite.internal.util.future.*; +import org.apache.ignite.internal.util.typedef.internal.*; + +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +import static org.apache.ignite.internal.processors.rest.GridRestCommand.*; + +/** + * Query command handler. + */ +public class QueryCommandHandler extends GridRestCommandHandlerAdapter { + /** Supported commands. */ + private static final Collection<GridRestCommand> SUPPORTED_COMMANDS = U.sealList(EXECUTE_SQL_QUERY, + EXECUTE_SQL_FIELDS_QUERY, + FETCH_SQL_QUERY, + CLOSE_SQL_QUERY); + + /** Query ID sequence. */ + private static final AtomicLong qryIdGen = new AtomicLong(); + + /** Current queries. */ + private final ConcurrentHashMap<Long, Iterator> curs = new ConcurrentHashMap<>(); + + /** Current queries cursors. */ + private final ConcurrentHashMap<Long, QueryCursor> qryCurs = new ConcurrentHashMap<>(); + + /** + * @param ctx Context. + */ + public QueryCommandHandler(GridKernalContext ctx) { + super(ctx); + } + + /** {@inheritDoc} */ + @Override public Collection<GridRestCommand> supportedCommands() { + return SUPPORTED_COMMANDS; + } + + /** {@inheritDoc} */ + @Override public IgniteInternalFuture<GridRestResponse> handleAsync(GridRestRequest req) { + assert req != null; + + assert SUPPORTED_COMMANDS.contains(req.command()); + assert req instanceof RestSqlQueryRequest : "Invalid type of query request."; + + switch (req.command()) { + case EXECUTE_SQL_QUERY: + case EXECUTE_SQL_FIELDS_QUERY: { + return ctx.closure().callLocalSafe( + new ExecuteQueryCallable(ctx, (RestSqlQueryRequest)req, curs, qryCurs), false); + } + + case FETCH_SQL_QUERY: { + return ctx.closure().callLocalSafe( + new FetchQueryCallable((RestSqlQueryRequest)req, curs, qryCurs), false); + } + + case CLOSE_SQL_QUERY: { + return ctx.closure().callLocalSafe( + new CloseQueryCallable((RestSqlQueryRequest)req, curs, qryCurs), false); + } + } + + return new GridFinishedFuture<>(); + } + + /** + * Execute query callable. + */ + private static class ExecuteQueryCallable implements Callable<GridRestResponse> { + /** Kernal context. */ + private GridKernalContext ctx; + + /** Execute query request. */ + private RestSqlQueryRequest req; + + /** Queries iterators. */ + private ConcurrentHashMap<Long, Iterator> curs; + + /** Queries cursors. */ + private ConcurrentHashMap<Long, QueryCursor> qryCurs; + + /** + * @param ctx Kernal context. + * @param req Execute query request. + * @param curs Queries cursors. + */ + public ExecuteQueryCallable(GridKernalContext ctx, RestSqlQueryRequest req, + ConcurrentHashMap<Long, Iterator> curs, ConcurrentHashMap<Long, QueryCursor> qryCurs) { + this.ctx = ctx; + this.req = req; + this.curs = curs; + this.qryCurs = qryCurs; + } + + /** {@inheritDoc} */ + @Override public GridRestResponse call() throws Exception { + try { + Query qry; + + if (req.typeName() != null) { + qry = new SqlQuery(req.typeName(), req.sqlQuery()); + + ((SqlQuery)qry).setArgs(req.arguments()); + } + else { + qry = new SqlFieldsQuery(req.sqlQuery()); + + ((SqlFieldsQuery)qry).setArgs(req.arguments()); + } + + IgniteCache<Object, Object> cache = ctx.grid().cache(req.cacheName()); + + if (cache == null) + return new GridRestResponse(GridRestResponse.STATUS_FAILED, + "No cache with name. [cacheName=" + req.cacheName() + "]"); + + QueryCursor qryCur = cache.query(qry); + + Iterator cur = qryCur.iterator(); + + long qryId = qryIdGen.getAndIncrement(); + + qryCurs.put(qryId, qryCur); + curs.put(qryId, cur); + + CacheQueryResult res = createQueryResult(qryCurs, curs, cur, req, qryId); + + return new GridRestResponse(res); + } + catch (Exception e) { + return new GridRestResponse(GridRestResponse.STATUS_FAILED, e.getMessage()); + } + } + } + + /** + * Close query callable. + */ + private static class CloseQueryCallable implements Callable<GridRestResponse> { + /** Execute query request. */ + private RestSqlQueryRequest req; + + /** Queries iterators. */ + private ConcurrentHashMap<Long, Iterator> curs; + + /** Queries cursors. */ + private ConcurrentHashMap<Long, QueryCursor> qryCurs; + + /** + * @param req Execute query request. + * @param curs Queries cursors. + */ + public CloseQueryCallable(RestSqlQueryRequest req, + ConcurrentHashMap<Long, Iterator> curs, + ConcurrentHashMap<Long, QueryCursor> qryCurs) { + this.req = req; + this.curs = curs; + this.qryCurs = qryCurs; + } + + /** {@inheritDoc} */ + @Override public GridRestResponse call() throws Exception { + try { + QueryCursor cur = qryCurs.get(req.queryId()); + + if (cur == null) + return new GridRestResponse(GridRestResponse.STATUS_FAILED, + "Cannot find query [qryId=" + req.queryId() + "]"); + + cur.close(); + + qryCurs.remove(req.queryId()); + curs.remove(req.queryId()); + + return new GridRestResponse(true); + } + catch (Exception e) { + qryCurs.remove(req.queryId()); + curs.remove(req.queryId()); + + return new GridRestResponse(GridRestResponse.STATUS_FAILED, e.getMessage()); + } + } + } + + /** + * Fetch query callable. + */ + private static class FetchQueryCallable implements Callable<GridRestResponse> { + /** Execute query request. */ + private RestSqlQueryRequest req; + + /** Queries iterators. */ + private ConcurrentHashMap<Long, Iterator> curs; + + /** Queries cursors. */ + private ConcurrentHashMap<Long, QueryCursor> qryCurs; + + /** + * @param req Execute query request. + * @param curs Queries cursors. + */ + public FetchQueryCallable(RestSqlQueryRequest req, ConcurrentHashMap<Long, Iterator> curs, + ConcurrentHashMap<Long, QueryCursor> qryCurs) { + this.req = req; + this.curs = curs; + this.qryCurs = qryCurs; + } + + /** {@inheritDoc} */ + @Override public GridRestResponse call() throws Exception { + try { + Iterator cur = curs.get(req.queryId()); + + if (cur == null) + return new GridRestResponse(GridRestResponse.STATUS_FAILED, + "Cannot find query [qryId=" + req.queryId() + "]"); + + CacheQueryResult res = createQueryResult(qryCurs, curs, cur, req, req.queryId()); + + return new GridRestResponse(res); + } + catch (Exception e) { + curs.remove(req.queryId()); + qryCurs.remove(req.queryId()); + + return new GridRestResponse(GridRestResponse.STATUS_FAILED, e.getMessage()); + } + } + } + + /** + * @param qryCurs Query cursors. + * @param curs Queries iterators. + * @param cur Current cursor. + * @param req Sql request. + * @param qryId Query id. + * @return Query result with items. + */ + private static CacheQueryResult createQueryResult(ConcurrentHashMap<Long, QueryCursor> qryCurs, + ConcurrentHashMap<Long, Iterator> curs, Iterator cur, + RestSqlQueryRequest req, Long qryId) { + CacheQueryResult res = new CacheQueryResult(); + + List<Object> items = new ArrayList<>(); + + for (int i = 0; i < req.pageSize() && cur.hasNext(); ++i) + items.add(cur.next()); + + res.setItems(items); + + res.setLast(!cur.hasNext()); + + res.setQueryId(qryId); + + if (!cur.hasNext()) { + qryCurs.remove(qryId); + curs.remove(qryId); + } + + return res; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/952ac724/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/version/GridVersionCommandHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/version/GridVersionCommandHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/version/GridVersionCommandHandler.java index 2bfb704..bf09d30 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/version/GridVersionCommandHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/version/GridVersionCommandHandler.java @@ -30,11 +30,11 @@ import static org.apache.ignite.internal.IgniteVersionUtils.*; import static org.apache.ignite.internal.processors.rest.GridRestCommand.*; /** - * Handler for {@link GridRestCommand#VERSION} command. + * Handler for {@link GridRestCommand#VERSION} and {@link GridRestCommand#NAME} command. */ public class GridVersionCommandHandler extends GridRestCommandHandlerAdapter { /** Supported commands. */ - private static final Collection<GridRestCommand> SUPPORTED_COMMANDS = U.sealList(VERSION); + private static final Collection<GridRestCommand> SUPPORTED_COMMANDS = U.sealList(VERSION, NAME); /** * @param ctx Context. @@ -54,6 +54,14 @@ public class GridVersionCommandHandler extends GridRestCommandHandlerAdapter { assert SUPPORTED_COMMANDS.contains(req.command()); - return new GridFinishedFuture<>(new GridRestResponse(VER_STR)); + switch (req.command()){ + case VERSION: + return new GridFinishedFuture<>(new GridRestResponse(VER_STR)); + + case NAME: + return new GridFinishedFuture<>(new GridRestResponse(ctx.gridName())); + } + + return new GridFinishedFuture<>(); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/952ac724/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/request/RestSqlQueryRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/request/RestSqlQueryRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/request/RestSqlQueryRequest.java new file mode 100644 index 0000000..5ba3a50 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/request/RestSqlQueryRequest.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.rest.request; + +/** + * Sql query request. + */ +public class RestSqlQueryRequest extends GridRestRequest { + /** Sql query. */ + private String sqlQry; + + /** Sql query arguments. */ + private Object[] args; + + /** Page size. */ + private Integer pageSize; + + /** Cache name. */ + private String cacheName; + + /** Query id. */ + private Long qryId; + + /** Query type name. */ + private String typeName; + + /** + * @param sqlQry Sql query. + */ + public void sqlQuery(String sqlQry) { + this.sqlQry = sqlQry; + } + + /** + * @return Sql query. + */ + public String sqlQuery() { + return sqlQry; + } + + /** + * @param args Sql query arguments. + */ + public void arguments(Object[] args) { + this.args = args; + } + + /** + * @return Sql query arguments. + */ + public Object[] arguments() { + return args; + } + + /** + * @param pageSize Page size. + */ + public void pageSize(Integer pageSize) { + this.pageSize = pageSize; + } + + /** + * @return Page size. + */ + public int pageSize() { + return pageSize; + } + + /** + * @param cacheName Cache name. + */ + public void cacheName(String cacheName) { + this.cacheName = cacheName; + } + + /** + * @return Cache name. + */ + public String cacheName() { + return cacheName; + } + + /** + * @param id Query id. + */ + public void queryId(Long id) { + this.qryId = id; + } + + /** + * @return Query id. + */ + public Long queryId() { + return qryId; + } + + /** + * @param typeName Query type name. + */ + public void typeName(String typeName) { + this.typeName = typeName; + } + + /** + * @return Query type name. + */ + public String typeName() { + return typeName; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/952ac724/modules/rest-http/src/main/java/org/apache/ignite/internal/processors/rest/protocols/http/jetty/GridJettyRestHandler.java ---------------------------------------------------------------------- diff --git a/modules/rest-http/src/main/java/org/apache/ignite/internal/processors/rest/protocols/http/jetty/GridJettyRestHandler.java b/modules/rest-http/src/main/java/org/apache/ignite/internal/processors/rest/protocols/http/jetty/GridJettyRestHandler.java index fac9818..d8bcac2 100644 --- a/modules/rest-http/src/main/java/org/apache/ignite/internal/processors/rest/protocols/http/jetty/GridJettyRestHandler.java +++ b/modules/rest-http/src/main/java/org/apache/ignite/internal/processors/rest/protocols/http/jetty/GridJettyRestHandler.java @@ -323,11 +323,21 @@ public class GridJettyRestHandler extends AbstractHandler { * @throws IgniteCheckedException If creation failed. */ @Nullable private GridRestRequest createRequest(GridRestCommand cmd, - Map<String, Object> params, - ServletRequest req) throws IgniteCheckedException { + Map<String, Object> params, HttpServletRequest req) throws IgniteCheckedException { GridRestRequest restReq; switch (cmd) { + case GET_OR_CREATE_CACHE: + case DESTROY_CACHE: { + GridRestCacheRequest restReq0 = new GridRestCacheRequest(); + + restReq0.cacheName((String)params.get("cacheName")); + + restReq = restReq0; + + break; + } + case ATOMIC_DECREMENT: case ATOMIC_INCREMENT: { DataStructuresRequest restReq0 = new DataStructuresRequest(); @@ -341,15 +351,25 @@ public class GridJettyRestHandler extends AbstractHandler { break; } + case CACHE_CONTAINS_KEY: + case CACHE_CONTAINS_KEYS: case CACHE_GET: case CACHE_GET_ALL: + case CACHE_GET_AND_PUT: + case CACHE_GET_AND_REPLACE: + case CACHE_PUT_IF_ABSENT: + case CACHE_GET_AND_PUT_IF_ABSENT: case CACHE_PUT: case CACHE_PUT_ALL: case CACHE_REMOVE: + case CACHE_REMOVE_VALUE: + case CACHE_REPLACE_VALUE: + case CACHE_GET_AND_REMOVE: case CACHE_REMOVE_ALL: case CACHE_ADD: case CACHE_CAS: case CACHE_METRICS: + case CACHE_SIZE: case CACHE_REPLACE: case CACHE_APPEND: case CACHE_PREPEND: { @@ -370,7 +390,8 @@ public class GridJettyRestHandler extends AbstractHandler { restReq0.cacheFlags(intValue("cacheFlags", params, 0)); restReq0.ttl(longValue("exp", params, null)); - if (cmd == CACHE_GET_ALL || cmd == CACHE_PUT_ALL || cmd == CACHE_REMOVE_ALL) { + if (cmd == CACHE_GET_ALL || cmd == CACHE_PUT_ALL || cmd == CACHE_REMOVE_ALL || + cmd == CACHE_CONTAINS_KEYS) { List<Object> keys = values("k", params); List<Object> vals = values("v", params); @@ -441,12 +462,53 @@ public class GridJettyRestHandler extends AbstractHandler { break; } + case NAME: case VERSION: { restReq = new GridRestRequest(); break; } + case EXECUTE_SQL_QUERY: + case EXECUTE_SQL_FIELDS_QUERY: { + RestSqlQueryRequest restReq0 = new RestSqlQueryRequest(); + + restReq0.sqlQuery((String) params.get("qry")); + + restReq0.arguments(values("arg", params).toArray()); + + restReq0.typeName((String)params.get("type")); + restReq0.pageSize(Integer.parseInt((String) params.get("psz"))); + restReq0.cacheName((String)params.get("cacheName")); + + restReq = restReq0; + + break; + } + + case FETCH_SQL_QUERY: { + RestSqlQueryRequest restReq0 = new RestSqlQueryRequest(); + + restReq0.queryId(Long.parseLong((String)params.get("qryId"))); + restReq0.pageSize(Integer.parseInt((String)params.get("psz"))); + restReq0.cacheName((String)params.get("cacheName")); + + restReq = restReq0; + + break; + } + + case CLOSE_SQL_QUERY: { + RestSqlQueryRequest restReq0 = new RestSqlQueryRequest(); + + restReq0.queryId(Long.parseLong((String)params.get("qryId"))); + restReq0.cacheName((String)params.get("cacheName")); + + restReq = restReq0; + + break; + } + default: throw new IgniteCheckedException("Invalid command: " + cmd); }