Repository: incubator-ignite Updated Branches: refs/heads/ignite-1161 a1601d791 -> a9eb9da03
#ignite-1161: replace scheduler with ignite scheduler. Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/a9eb9da0 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/a9eb9da0 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/a9eb9da0 Branch: refs/heads/ignite-1161 Commit: a9eb9da039cf6fa4c703c8a337145b8bf091723f Parents: a1601d7 Author: ivasilinets <ivasilin...@gridgain.com> Authored: Tue Jul 28 17:54:23 2015 +0300 Committer: ivasilinets <ivasilin...@gridgain.com> Committed: Tue Jul 28 17:54:23 2015 +0300 ---------------------------------------------------------------------- .../rest/AbstractRestProcessorSelfTest.java | 3 +- .../JettyRestProcessorAbstractSelfTest.java | 7 +- .../configuration/ConnectorConfiguration.java | 60 +++++++++--- .../handlers/query/QueryCommandHandler.java | 99 ++++++++++---------- 4 files changed, 104 insertions(+), 65 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a9eb9da0/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/AbstractRestProcessorSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/AbstractRestProcessorSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/AbstractRestProcessorSelfTest.java index b5b430c..9b26bd8 100644 --- a/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/AbstractRestProcessorSelfTest.java +++ b/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/AbstractRestProcessorSelfTest.java @@ -73,7 +73,8 @@ abstract class AbstractRestProcessorSelfTest extends GridCommonAbstractTest { clientCfg.setJettyPath("modules/clients/src/test/resources/jetty/rest-jetty.xml"); - clientCfg.setQueryRemoveDelay(5); + clientCfg.setIdleQueryCursorTimeout(5000); + clientCfg.setQueryCheckFrequency(5000); cfg.setConnectorConfiguration(clientCfg); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a9eb9da0/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAbstractSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAbstractSelfTest.java index 91dfa66..29ca521 100644 --- a/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAbstractSelfTest.java +++ b/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAbstractSelfTest.java @@ -1210,7 +1210,10 @@ public abstract class JettyRestProcessorAbstractSelfTest extends AbstractRestPro params.put("arg1", "1000"); params.put("arg2", "2000"); - String ret = content(params); + String ret = null; + + for (int i = 0; i < 10; ++i) + ret = content(params); assertNotNull(ret); assertTrue(!ret.isEmpty()); @@ -1223,7 +1226,7 @@ public abstract class JettyRestProcessorAbstractSelfTest extends AbstractRestPro assertTrue(queryCursorFound()); - U.sleep(12000); + U.sleep(10000); assertFalse(queryCursorFound()); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a9eb9da0/modules/core/src/main/java/org/apache/ignite/configuration/ConnectorConfiguration.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/ConnectorConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/ConnectorConfiguration.java index 237f4b1..bd849a9 100644 --- a/modules/core/src/main/java/org/apache/ignite/configuration/ConnectorConfiguration.java +++ b/modules/core/src/main/java/org/apache/ignite/configuration/ConnectorConfiguration.java @@ -61,8 +61,11 @@ public class ConnectorConfiguration { /** Default socket send and receive buffer size. */ public static final int DFLT_SOCK_BUF_SIZE = 32 * 1024; - /** Default delay for storing query cursor (10 minutes). */ - private static final int DFLT_QRY_RMV_DELAY = 10 * 60; + /** Default REST idle timeout for query cursor. */ + private static final long DFLT_IDLE_QRY_CUR_TIMEOUT = 10 * 60 * 1000; + + /** Default REST check frequency for query cursor. */ + private static final long DFLT_QRY_CHECK_FRQ_TIMEOUT = 60 * 1000; /** Jetty XML configuration path. */ private String jettyPath; @@ -88,8 +91,11 @@ public class ConnectorConfiguration { /** REST TCP receive buffer size. */ private int rcvBufSize = DFLT_SOCK_BUF_SIZE; - /** REST delay for storing query cursor. */ - private int qryRmvDelay = DFLT_QRY_RMV_DELAY; + /** REST idle timeout for query cursor. */ + private long idleQryCurTimeout = DFLT_IDLE_QRY_CUR_TIMEOUT; + + /** REST idle timeout for query cursor. */ + private long qryCheckFrq = DFLT_QRY_CHECK_FRQ_TIMEOUT; /** REST TCP send queue limit. */ private int sndQueueLimit; @@ -154,7 +160,8 @@ public class ConnectorConfiguration { sslClientAuth = cfg.isSslClientAuth(); sslCtxFactory = cfg.getSslContextFactory(); sslEnabled = cfg.isSslEnabled(); - qryRmvDelay = cfg.getQueryRemoveDelay(); + idleQryCurTimeout = cfg.getIdleQueryCursorTimeout(); + qryCheckFrq = cfg.getQueryCheckFrequency(); } /** @@ -556,18 +563,47 @@ public class ConnectorConfiguration { } /** - * Sets delay for removing query cursors that are not used. + * Sets idle query cursors timeout. + * + * @param idleQryCurTimeout Idle query cursors timeout in milliseconds. + * @see #getIdleQueryCursorTimeout() + */ + public void setIdleQueryCursorTimeout(long idleQryCurTimeout) { + this.idleQryCurTimeout = idleQryCurTimeout; + } + + /** + * Gets idle query cursors timeout in milliseconds. + * <p> + * This setting is used to reject open query cursors that is not used. If no fetch query request + * come within idle timeout, it will be removed on next check for old query cursors + * (see {@link #getQueryCheckFrequency()}). * - * @param qryRmvDelay Query remove delay in seconds. + * @return Idle query cursors timeout in milliseconds */ - public void setQueryRemoveDelay(int qryRmvDelay) { - this.qryRmvDelay = qryRmvDelay; + public long getIdleQueryCursorTimeout() { + return idleQryCurTimeout; } /** - * Gets delay for removing query cursors that are not used. + * Sets query check frequency. + * + * @param qryCheckFrq Idle query check frequency in milliseconds. + * @see #getQueryCheckFrequency() + */ + public void setQueryCheckFrequency(long qryCheckFrq) { + this.qryCheckFrq = qryCheckFrq; + } + + /** + * Gets query cursors check frequency. + * This setting is used to reject open query cursors that is not used. + * <p> + * Scheduler tries with specified period to close queries' cursors that are overtime. + * + * @return Query check frequency in milliseconds. */ - public int getQueryRemoveDelay() { - return qryRmvDelay; + public long getQueryCheckFrequency() { + return qryCheckFrq; } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a9eb9da0/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/query/QueryCommandHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/query/QueryCommandHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/query/QueryCommandHandler.java index 18a2ae7..ffac32c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/query/QueryCommandHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/query/QueryCommandHandler.java @@ -27,7 +27,6 @@ import org.apache.ignite.internal.processors.rest.handlers.*; import org.apache.ignite.internal.processors.rest.request.*; import org.apache.ignite.internal.util.future.*; import org.apache.ignite.internal.util.lang.*; -import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; import java.util.*; @@ -50,22 +49,36 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter { private static final AtomicLong qryIdGen = new AtomicLong(); /** Current queries cursors. */ - private final static ConcurrentHashMap<Long, GridTuple3<QueryCursor, Iterator, Boolean>> qryCurs = + private final static ConcurrentHashMap<Long, GridTuple3<QueryCursor, Iterator, Long>> qryCurs = new ConcurrentHashMap<>(); - /** Remove delay. */ - private static int rmvDelay = 0; - - /** Scheduler. */ - private static final ScheduledExecutorService SCHEDULER = Executors.newScheduledThreadPool(1); - /** * @param ctx Context. */ public QueryCommandHandler(GridKernalContext ctx) { super(ctx); - rmvDelay = ctx.config().getConnectorConfiguration().getQueryRemoveDelay(); + final long idleQryCurTimeout = ctx.config().getConnectorConfiguration().getIdleQueryCursorTimeout(); + + long qryCheckFrq = ctx.config().getConnectorConfiguration().getQueryCheckFrequency(); + + ctx.timeout().schedule(new Runnable() { + @Override public void run() { + long time = System.currentTimeMillis(); + + for (Map.Entry<Long, GridTuple3<QueryCursor, Iterator, Long>> e : qryCurs.entrySet()) { + synchronized (e.getValue()) { + long createTime = e.getValue().get3(); + + if (createTime + idleQryCurTimeout > time) { + e.getValue().get1().close(); + + qryCurs.remove(e.getKey()); + } + } + } + } + }, qryCheckFrq, qryCheckFrq); } /** {@inheritDoc} */ @@ -148,17 +161,20 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter { Iterator cur = qryCur.iterator(); - qryCurs.put(qryId, new GridTuple3<>(qryCur, cur, true)); + GridTuple3<QueryCursor, Iterator, Long> val = + new GridTuple3<>(qryCur, cur, System.currentTimeMillis()); - scheduleRemove(qryId); + synchronized (val) { + qryCurs.put(qryId, val); - CacheQueryResult res = createQueryResult(cur, req, qryId); + CacheQueryResult res = createQueryResult(cur, req, qryId); - List<GridQueryFieldMetadata> fieldsMeta = ((QueryCursorImpl<?>) qryCur).fieldsMeta(); + List<GridQueryFieldMetadata> fieldsMeta = ((QueryCursorImpl<?>) qryCur).fieldsMeta(); - res.setFieldsMetadata(convertMetadata(fieldsMeta)); + res.setFieldsMetadata(convertMetadata(fieldsMeta)); - return new GridRestResponse(res); + return new GridRestResponse(res); + } } catch (Exception e) { qryCurs.remove(qryId); @@ -200,15 +216,19 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter { /** {@inheritDoc} */ @Override public GridRestResponse call() throws Exception { try { - QueryCursor cur = qryCurs.get(req.queryId()).get1(); + GridTuple3<QueryCursor, Iterator, Long> val = qryCurs.get(req.queryId()); - if (cur == null) + if (val == null) return new GridRestResponse(GridRestResponse.STATUS_FAILED, "Cannot find query [qryId=" + req.queryId() + "]"); - cur.close(); + synchronized (val) { + QueryCursor cur = val.get1(); - qryCurs.remove(req.queryId()); + cur.close(); + + qryCurs.remove(req.queryId()); + } return new GridRestResponse(true); } @@ -237,19 +257,21 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter { /** {@inheritDoc} */ @Override public GridRestResponse call() throws Exception { try { - GridTuple3<QueryCursor, Iterator, Boolean> t = qryCurs.get(req.queryId()); + GridTuple3<QueryCursor, Iterator, Long> t = qryCurs.get(req.queryId()); - t.set3(true); - - Iterator cur = t.get2(); - - if (cur == null) + if (t == null) return new GridRestResponse(GridRestResponse.STATUS_FAILED, "Cannot find query [qryId=" + req.queryId() + "]"); - CacheQueryResult res = createQueryResult(cur, req, req.queryId()); + synchronized (t) { + t.set3(System.currentTimeMillis()); + + Iterator cur = t.get2(); + + CacheQueryResult res = createQueryResult(cur, req, req.queryId()); - return new GridRestResponse(res); + return new GridRestResponse(res); + } } catch (Exception e) { qryCurs.remove(req.queryId()); @@ -284,27 +306,4 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter { return res; } - - /** - * Schedule remove for query cursor. - * - * @param id Query id. - */ - private static void scheduleRemove(final long id) { - SCHEDULER.schedule(new CAX() { - @Override public void applyx() throws IgniteCheckedException { - GridTuple3<QueryCursor, Iterator, Boolean> t = qryCurs.get(id); - - if (t != null) { - if (t.get3()) { - t.set3(false); - - scheduleRemove(id); - } - else - qryCurs.remove(id); - } - } - }, rmvDelay, TimeUnit.SECONDS); - } }