Repository: incubator-ignite Updated Branches: refs/heads/ignite-1161 0f7816def -> 6de1f082f
#ignite-1161: add new class for query cursor. Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/6de1f082 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/6de1f082 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/6de1f082 Branch: refs/heads/ignite-1161 Commit: 6de1f082f1df4dd9c9cccc30ea3f9c46b253cf28 Parents: 0f7816d Author: ivasilinets <ivasilin...@gridgain.com> Authored: Fri Jul 31 15:46:31 2015 +0300 Committer: ivasilinets <ivasilin...@gridgain.com> Committed: Fri Jul 31 15:46:31 2015 +0300 ---------------------------------------------------------------------- .../JettyRestProcessorAbstractSelfTest.java | 2 +- .../handlers/query/QueryCommandHandler.java | 203 +++++++++++++++---- 2 files changed, 162 insertions(+), 43 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6de1f082/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAbstractSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAbstractSelfTest.java index 0468c9b..9e82f13 100644 --- a/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAbstractSelfTest.java +++ b/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAbstractSelfTest.java @@ -1204,7 +1204,7 @@ public abstract class JettyRestProcessorAbstractSelfTest extends AbstractRestPro Map<String, String> params = new HashMap<>(); params.put("cmd", GridRestCommand.EXECUTE_SQL_QUERY.key()); params.put("type", "Person"); - params.put("psz", "1"); + params.put("pageSize", "1"); params.put("cacheName", "person"); params.put("qry", URLEncoder.encode(qry)); params.put("arg1", "1000"); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6de1f082/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/query/QueryCommandHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/query/QueryCommandHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/query/QueryCommandHandler.java index bb19f2a..d992f23 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/query/QueryCommandHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/query/QueryCommandHandler.java @@ -26,12 +26,12 @@ import org.apache.ignite.internal.processors.rest.*; import org.apache.ignite.internal.processors.rest.handlers.*; import org.apache.ignite.internal.processors.rest.request.*; import org.apache.ignite.internal.util.future.*; -import org.apache.ignite.internal.util.lang.*; import org.apache.ignite.internal.util.typedef.internal.*; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.*; +import java.util.concurrent.locks.*; import static org.apache.ignite.internal.processors.rest.GridRestCommand.*; @@ -49,8 +49,7 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter { private static final AtomicLong qryIdGen = new AtomicLong(); /** Current queries cursors. */ - private final static ConcurrentHashMap<Long, GridTuple3<QueryCursor, Iterator, Long>> qryCurs = - new ConcurrentHashMap<>(); + private final ConcurrentHashMap<Long, QueryCursorIterator> qryCurs = new ConcurrentHashMap<>(); /** * @param ctx Context. @@ -66,13 +65,24 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter { @Override public void run() { long time = U.currentTimeMillis(); - for (Map.Entry<Long, GridTuple3<QueryCursor, Iterator, Long>> e : qryCurs.entrySet()) { - synchronized (e.getValue()) { - long createTime = e.getValue().get3(); + for (Map.Entry<Long, QueryCursorIterator> e : qryCurs.entrySet()) { + QueryCursorIterator val = e.getValue(); - if (createTime + idleQryCurTimeout > time && qryCurs.remove(e.getKey(), e.getValue())) - e.getValue().get1().close(); - } + long createTime = val.lastUsage(); + + if (createTime + idleQryCurTimeout > time) + if (val.lock().tryLock()) { + try { + val.lastUsage(-1); + + qryCurs.remove(e.getKey(), val); + + val.close(); + } + finally { + val.lock().unlock(); + } + } } } }, qryCheckFrq, qryCheckFrq); @@ -94,17 +104,17 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter { case EXECUTE_SQL_QUERY: case EXECUTE_SQL_FIELDS_QUERY: { return ctx.closure().callLocalSafe( - new ExecuteQueryCallable(ctx, (RestSqlQueryRequest)req), false); + new ExecuteQueryCallable(ctx, (RestSqlQueryRequest)req, qryCurs), false); } case FETCH_SQL_QUERY: { return ctx.closure().callLocalSafe( - new FetchQueryCallable((RestSqlQueryRequest)req), false); + new FetchQueryCallable((RestSqlQueryRequest)req, qryCurs), false); } case CLOSE_SQL_QUERY: { return ctx.closure().callLocalSafe( - new CloseQueryCallable((RestSqlQueryRequest)req), false); + new CloseQueryCallable((RestSqlQueryRequest)req, qryCurs), false); } } @@ -121,13 +131,19 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter { /** Execute query request. */ private RestSqlQueryRequest req; + /** Current queries cursors. */ + private final ConcurrentHashMap<Long, QueryCursorIterator> qryCurs; + /** * @param ctx Kernal context. * @param req Execute query request. + * @param qryCurs Query cursors. */ - public ExecuteQueryCallable(GridKernalContext ctx, RestSqlQueryRequest req) { + public ExecuteQueryCallable(GridKernalContext ctx, RestSqlQueryRequest req, + ConcurrentHashMap<Long, QueryCursorIterator> qryCurs) { this.ctx = ctx; this.req = req; + this.qryCurs = qryCurs; } /** {@inheritDoc} */ @@ -158,12 +174,14 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter { Iterator cur = qryCur.iterator(); - GridTuple3<QueryCursor, Iterator, Long> val = new GridTuple3<>(qryCur, cur, U.currentTimeMillis()); + QueryCursorIterator val = new QueryCursorIterator(qryCur, cur); - synchronized (val) { + val.lock().lock(); + + try { qryCurs.put(qryId, val); - CacheQueryResult res = createQueryResult(cur, req, qryId); + CacheQueryResult res = createQueryResult(cur, req, qryId, qryCurs); List<GridQueryFieldMetadata> fieldsMeta = ((QueryCursorImpl<?>) qryCur).fieldsMeta(); @@ -171,9 +189,12 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter { return new GridRestResponse(res); } + finally { + val.lock().unlock(); + } } catch (Exception e) { - removeQueryCursor(qryId); + removeQueryCursor(qryId, qryCurs); return new GridRestResponse(GridRestResponse.STATUS_FAILED, e.getMessage()); } @@ -202,34 +223,44 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter { /** Execute query request. */ private RestSqlQueryRequest req; + /** Current queries cursors. */ + private final ConcurrentHashMap<Long, QueryCursorIterator> qryCurs; + /** * @param req Execute query request. + * @param qryCurs Query cursors. */ - public CloseQueryCallable(RestSqlQueryRequest req) { + public CloseQueryCallable(RestSqlQueryRequest req, ConcurrentHashMap<Long, QueryCursorIterator> qryCurs) { this.req = req; + this.qryCurs = qryCurs; } /** {@inheritDoc} */ @Override public GridRestResponse call() throws Exception { try { - GridTuple3<QueryCursor, Iterator, Long> val = qryCurs.get(req.queryId()); + QueryCursorIterator val = qryCurs.get(req.queryId()); if (val == null) - return new GridRestResponse(GridRestResponse.STATUS_FAILED, - "Failed to find query with ID: " + req.queryId()); + return new GridRestResponse(true); + + val.lock().lock(); - synchronized (val) { - QueryCursor cur = val.get1(); + try{ + if (val.lastUsage() == -1) + return new GridRestResponse(true); - cur.close(); + val.close(); qryCurs.remove(req.queryId()); } + finally { + val.lock().unlock(); + } return new GridRestResponse(true); } catch (Exception e) { - removeQueryCursor(req.queryId()); + removeQueryCursor(req.queryId(), qryCurs); return new GridRestResponse(GridRestResponse.STATUS_FAILED, e.getMessage()); } @@ -243,34 +274,48 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter { /** Execute query request. */ private RestSqlQueryRequest req; + /** Current queries cursors. */ + private final ConcurrentHashMap<Long, QueryCursorIterator> qryCurs; + /** * @param req Execute query request. + * @param qryCurs Query cursors. */ - public FetchQueryCallable(RestSqlQueryRequest req) { + public FetchQueryCallable(RestSqlQueryRequest req, ConcurrentHashMap<Long, QueryCursorIterator> qryCurs) { this.req = req; + this.qryCurs = qryCurs; } /** {@inheritDoc} */ @Override public GridRestResponse call() throws Exception { try { - GridTuple3<QueryCursor, Iterator, Long> t = qryCurs.get(req.queryId()); + QueryCursorIterator val = qryCurs.get(req.queryId()); - if (t == null) + if (val == null) return new GridRestResponse(GridRestResponse.STATUS_FAILED, "Failed to find query with ID: " + req.queryId()); - synchronized (t) { - t.set3(System.currentTimeMillis()); + val.lock().lock(); - Iterator cur = t.get2(); + try { + if (val.lastUsage() == -1) + return new GridRestResponse(GridRestResponse.STATUS_FAILED, + "Query is closed by timeout. Restart query with ID: " + req.queryId()); - CacheQueryResult res = createQueryResult(cur, req, req.queryId()); + val.lastUsage(U.currentTimeMillis()); + + Iterator cur = val.iterator(); + + CacheQueryResult res = createQueryResult(cur, req, req.queryId(), qryCurs); return new GridRestResponse(res); } + finally { + val.lock().unlock(); + } } catch (Exception e) { - removeQueryCursor(req.queryId()); + removeQueryCursor(req.queryId(), qryCurs); return new GridRestResponse(GridRestResponse.STATUS_FAILED, e.getMessage()); } @@ -281,9 +326,11 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter { * @param cur Current cursor. * @param req Sql request. * @param qryId Query id. + * @param qryCurs Query cursors. * @return Query result with items. */ - private static CacheQueryResult createQueryResult(Iterator cur, RestSqlQueryRequest req, Long qryId) { + private static CacheQueryResult createQueryResult(Iterator cur, RestSqlQueryRequest req, Long qryId, + ConcurrentHashMap<Long, QueryCursorIterator> qryCurs) { CacheQueryResult res = new CacheQueryResult(); List<Object> items = new ArrayList<>(); @@ -298,7 +345,7 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter { res.setQueryId(qryId); if (!cur.hasNext()) - removeQueryCursor(qryId); + removeQueryCursor(qryId, qryCurs); return res; } @@ -307,16 +354,88 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter { * Removes query cursor. * * @param qryId Query id. + * @param qryCurs Query cursors. */ - private static void removeQueryCursor(Long qryId) { - GridTuple3<QueryCursor, Iterator, Long> t = qryCurs.get(qryId); + private static void removeQueryCursor(Long qryId, ConcurrentHashMap<Long, QueryCursorIterator> qryCurs) { + QueryCursorIterator t = qryCurs.get(qryId); - if (t != null) { - synchronized (t) { - t.get1().close(); + if (t == null) + return; - qryCurs.remove(qryId); - } + t.lock().lock(); + + try{ + if (t.lastUsage() == -1) + return; + + t.close(); + + qryCurs.remove(qryId); + } + finally { + t.lock().unlock(); + } + } + + /** + * Query cursor iterator. + */ + private static class QueryCursorIterator { + /** Query cursor. */ + private QueryCursor cur; + + /** Query iterator. */ + private Iterator iter; + + /** Last timestamp. */ + private volatile long lastUsage; + + /** Reentrant lock. */ + private final ReentrantLock lock = new ReentrantLock(); + + /** + * @param cur Query cursor. + * @param iter Query iterator. + */ + public QueryCursorIterator(QueryCursor cur, Iterator iter) { + this.cur = cur; + this.iter = iter; + lastUsage = U.currentTimeMillis(); + } + + /** + * @return Lock. + */ + public ReentrantLock lock() { + return lock; + } + + /** + * @return Query iterator. + */ + public Iterator iterator() { + return iter; + } + + /** + * @return Last usage + */ + public long lastUsage() { + return lastUsage; + } + + /** + * @param time Current time or -1 if cursor is closed. + */ + public void lastUsage(long time) { + lastUsage = time; + } + + /** + * Close query cursor. + */ + public void close() { + cur.close(); } } }