Repository: incubator-ignite Updated Branches: refs/heads/ignite-1161 [created] a1601d791
#ignite-1161: add schedule remover for query cursor. Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/a1601d79 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/a1601d79 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/a1601d79 Branch: refs/heads/ignite-1161 Commit: a1601d79181755b985dab80e02c08bdf05788722 Parents: a127756 Author: ivasilinets <ivasilin...@gridgain.com> Authored: Tue Jul 28 12:02:27 2015 +0300 Committer: ivasilinets <ivasilin...@gridgain.com> Committed: Tue Jul 28 12:02:27 2015 +0300 ---------------------------------------------------------------------- .../rest/AbstractRestProcessorSelfTest.java | 2 + .../JettyRestProcessorAbstractSelfTest.java | 34 ++++++++ .../configuration/ConnectorConfiguration.java | 23 +++++ .../handlers/query/QueryCommandHandler.java | 90 ++++++++++++-------- 4 files changed, 113 insertions(+), 36 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a1601d79/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/AbstractRestProcessorSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/AbstractRestProcessorSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/AbstractRestProcessorSelfTest.java index 8310b0f..b5b430c 100644 --- a/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/AbstractRestProcessorSelfTest.java +++ b/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/AbstractRestProcessorSelfTest.java @@ -73,6 +73,8 @@ abstract class AbstractRestProcessorSelfTest extends GridCommonAbstractTest { clientCfg.setJettyPath("modules/clients/src/test/resources/jetty/rest-jetty.xml"); + clientCfg.setQueryRemoveDelay(5); + cfg.setConnectorConfiguration(clientCfg); TcpDiscoverySpi disco = new TcpDiscoverySpi(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a1601d79/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAbstractSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAbstractSelfTest.java index 8ce070f..91dfa66 100644 --- a/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAbstractSelfTest.java +++ b/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAbstractSelfTest.java @@ -25,6 +25,7 @@ import org.apache.ignite.cache.query.annotations.*; import org.apache.ignite.configuration.*; import org.apache.ignite.internal.processors.rest.handlers.*; import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.testframework.*; import java.io.*; @@ -1194,6 +1195,39 @@ public abstract class JettyRestProcessorAbstractSelfTest extends AbstractRestPro assertFalse(queryCursorFound()); } + /** + * @throws Exception If failed. + */ + public void testQueryDelay() throws Exception { + String qry = "salary > ? and salary <= ?"; + + Map<String, String> params = new HashMap<>(); + params.put("cmd", GridRestCommand.EXECUTE_SQL_QUERY.key()); + params.put("type", "Person"); + params.put("psz", "1"); + params.put("cacheName", "person"); + params.put("qry", URLEncoder.encode(qry)); + params.put("arg1", "1000"); + params.put("arg2", "2000"); + + String ret = content(params); + + assertNotNull(ret); + assertTrue(!ret.isEmpty()); + + JSONObject json = JSONObject.fromObject(ret); + + List items = (List)((Map)json.get("response")).get("items"); + + assertEquals(1, items.size()); + + assertTrue(queryCursorFound()); + + U.sleep(12000); + + assertFalse(queryCursorFound()); + } + protected abstract String signature() throws Exception; /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a1601d79/modules/core/src/main/java/org/apache/ignite/configuration/ConnectorConfiguration.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/ConnectorConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/ConnectorConfiguration.java index 98753e2..237f4b1 100644 --- a/modules/core/src/main/java/org/apache/ignite/configuration/ConnectorConfiguration.java +++ b/modules/core/src/main/java/org/apache/ignite/configuration/ConnectorConfiguration.java @@ -61,6 +61,9 @@ public class ConnectorConfiguration { /** Default socket send and receive buffer size. */ public static final int DFLT_SOCK_BUF_SIZE = 32 * 1024; + /** Default delay for storing query cursor (10 minutes). */ + private static final int DFLT_QRY_RMV_DELAY = 10 * 60; + /** Jetty XML configuration path. */ private String jettyPath; @@ -85,6 +88,9 @@ public class ConnectorConfiguration { /** REST TCP receive buffer size. */ private int rcvBufSize = DFLT_SOCK_BUF_SIZE; + /** REST delay for storing query cursor. */ + private int qryRmvDelay = DFLT_QRY_RMV_DELAY; + /** REST TCP send queue limit. */ private int sndQueueLimit; @@ -148,6 +154,7 @@ public class ConnectorConfiguration { sslClientAuth = cfg.isSslClientAuth(); sslCtxFactory = cfg.getSslContextFactory(); sslEnabled = cfg.isSslEnabled(); + qryRmvDelay = cfg.getQueryRemoveDelay(); } /** @@ -547,4 +554,20 @@ public class ConnectorConfiguration { public void setMessageInterceptor(ConnectorMessageInterceptor interceptor) { msgInterceptor = interceptor; } + + /** + * Sets delay for removing query cursors that are not used. + * + * @param qryRmvDelay Query remove delay in seconds. + */ + public void setQueryRemoveDelay(int qryRmvDelay) { + this.qryRmvDelay = qryRmvDelay; + } + + /** + * Gets delay for removing query cursors that are not used. + */ + public int getQueryRemoveDelay() { + return qryRmvDelay; + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a1601d79/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/query/QueryCommandHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/query/QueryCommandHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/query/QueryCommandHandler.java index 59f95c9..18a2ae7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/query/QueryCommandHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/query/QueryCommandHandler.java @@ -26,8 +26,9 @@ import org.apache.ignite.internal.processors.rest.*; import org.apache.ignite.internal.processors.rest.handlers.*; import org.apache.ignite.internal.processors.rest.request.*; import org.apache.ignite.internal.util.future.*; +import org.apache.ignite.internal.util.lang.*; +import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.lang.*; import java.util.*; import java.util.concurrent.*; @@ -49,13 +50,22 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter { private static final AtomicLong qryIdGen = new AtomicLong(); /** Current queries cursors. */ - private final ConcurrentHashMap<Long, IgniteBiTuple<QueryCursor, Iterator>> qryCurs = new ConcurrentHashMap<>(); + private final static ConcurrentHashMap<Long, GridTuple3<QueryCursor, Iterator, Boolean>> qryCurs = + new ConcurrentHashMap<>(); + + /** Remove delay. */ + private static int rmvDelay = 0; + + /** Scheduler. */ + private static final ScheduledExecutorService SCHEDULER = Executors.newScheduledThreadPool(1); /** * @param ctx Context. */ public QueryCommandHandler(GridKernalContext ctx) { super(ctx); + + rmvDelay = ctx.config().getConnectorConfiguration().getQueryRemoveDelay(); } /** {@inheritDoc} */ @@ -74,17 +84,17 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter { case EXECUTE_SQL_QUERY: case EXECUTE_SQL_FIELDS_QUERY: { return ctx.closure().callLocalSafe( - new ExecuteQueryCallable(ctx, (RestSqlQueryRequest)req, qryCurs), false); + new ExecuteQueryCallable(ctx, (RestSqlQueryRequest)req), false); } case FETCH_SQL_QUERY: { return ctx.closure().callLocalSafe( - new FetchQueryCallable((RestSqlQueryRequest)req, qryCurs), false); + new FetchQueryCallable((RestSqlQueryRequest)req), false); } case CLOSE_SQL_QUERY: { return ctx.closure().callLocalSafe( - new CloseQueryCallable((RestSqlQueryRequest)req, qryCurs), false); + new CloseQueryCallable((RestSqlQueryRequest)req), false); } } @@ -101,24 +111,18 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter { /** Execute query request. */ private RestSqlQueryRequest req; - /** Queries cursors. */ - private ConcurrentHashMap<Long, IgniteBiTuple<QueryCursor, Iterator>> qryCurs; - /** * @param ctx Kernal context. * @param req Execute query request. - * @param qryCurs Queries cursors. */ - public ExecuteQueryCallable(GridKernalContext ctx, RestSqlQueryRequest req, - ConcurrentHashMap<Long, IgniteBiTuple<QueryCursor, Iterator>> qryCurs) { + public ExecuteQueryCallable(GridKernalContext ctx, RestSqlQueryRequest req) { this.ctx = ctx; this.req = req; - this.qryCurs = qryCurs; } /** {@inheritDoc} */ @Override public GridRestResponse call() throws Exception { - long qryId = qryIdGen.getAndIncrement(); + final long qryId = qryIdGen.getAndIncrement(); try { Query qry; @@ -140,13 +144,15 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter { return new GridRestResponse(GridRestResponse.STATUS_FAILED, "No cache with name [cacheName=" + req.cacheName() + "]"); - QueryCursor qryCur = cache.query(qry); + final QueryCursor qryCur = cache.query(qry); Iterator cur = qryCur.iterator(); - qryCurs.put(qryId, new IgniteBiTuple<>(qryCur, cur)); + qryCurs.put(qryId, new GridTuple3<>(qryCur, cur, true)); - CacheQueryResult res = createQueryResult(qryCurs, cur, req, qryId); + scheduleRemove(qryId); + + CacheQueryResult res = createQueryResult(cur, req, qryId); List<GridQueryFieldMetadata> fieldsMeta = ((QueryCursorImpl<?>) qryCur).fieldsMeta(); @@ -184,17 +190,11 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter { /** Execute query request. */ private RestSqlQueryRequest req; - /** Queries cursors. */ - private final ConcurrentHashMap<Long, IgniteBiTuple<QueryCursor, Iterator>> qryCurs; - /** * @param req Execute query request. - * @param qryCurs Queries cursors. */ - public CloseQueryCallable(RestSqlQueryRequest req, - ConcurrentHashMap<Long, IgniteBiTuple<QueryCursor, Iterator>> qryCurs) { + public CloseQueryCallable(RestSqlQueryRequest req) { this.req = req; - this.qryCurs = qryCurs; } /** {@inheritDoc} */ @@ -227,29 +227,27 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter { /** Execute query request. */ private RestSqlQueryRequest req; - /** Queries cursors. */ - private final ConcurrentHashMap<Long, IgniteBiTuple<QueryCursor, Iterator>> qryCurs; - /** * @param req Execute query request. - * @param qryCurs Queries cursors. */ - public FetchQueryCallable(RestSqlQueryRequest req, - ConcurrentHashMap<Long, IgniteBiTuple<QueryCursor, Iterator>> qryCurs) { + public FetchQueryCallable(RestSqlQueryRequest req) { this.req = req; - this.qryCurs = qryCurs; } /** {@inheritDoc} */ @Override public GridRestResponse call() throws Exception { try { - Iterator cur = qryCurs.get(req.queryId()).get2(); + GridTuple3<QueryCursor, Iterator, Boolean> t = qryCurs.get(req.queryId()); + + t.set3(true); + + Iterator cur = t.get2(); if (cur == null) return new GridRestResponse(GridRestResponse.STATUS_FAILED, "Cannot find query [qryId=" + req.queryId() + "]"); - CacheQueryResult res = createQueryResult(qryCurs, cur, req, req.queryId()); + CacheQueryResult res = createQueryResult(cur, req, req.queryId()); return new GridRestResponse(res); } @@ -262,15 +260,12 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter { } /** - * @param qryCurs Query cursors. * @param cur Current cursor. * @param req Sql request. * @param qryId Query id. * @return Query result with items. */ - private static CacheQueryResult createQueryResult( - ConcurrentHashMap<Long, IgniteBiTuple<QueryCursor, Iterator>> qryCurs, - Iterator cur, RestSqlQueryRequest req, Long qryId) { + private static CacheQueryResult createQueryResult(Iterator cur, RestSqlQueryRequest req, Long qryId) { CacheQueryResult res = new CacheQueryResult(); List<Object> items = new ArrayList<>(); @@ -289,4 +284,27 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter { return res; } + + /** + * Schedule remove for query cursor. + * + * @param id Query id. + */ + private static void scheduleRemove(final long id) { + SCHEDULER.schedule(new CAX() { + @Override public void applyx() throws IgniteCheckedException { + GridTuple3<QueryCursor, Iterator, Boolean> t = qryCurs.get(id); + + if (t != null) { + if (t.get3()) { + t.set3(false); + + scheduleRemove(id); + } + else + qryCurs.remove(id); + } + } + }, rmvDelay, TimeUnit.SECONDS); + } }