ignite-sql-tests - leak fixes
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/9424b9cd Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/9424b9cd Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/9424b9cd Branch: refs/heads/ignite-sql-tests Commit: 9424b9cd3e9373034d3fde4218716fa7fe9c386e Parents: 4e3b4cb Author: S.Vladykin <svlady...@gridgain.com> Authored: Wed Mar 4 00:28:33 2015 +0300 Committer: S.Vladykin <svlady...@gridgain.com> Committed: Wed Mar 4 00:28:33 2015 +0300 ---------------------------------------------------------------------- .../query/h2/twostep/GridMapQueryExecutor.java | 267 ++++++++++++++----- .../query/h2/twostep/GridMergeIndex.java | 9 +- .../h2/twostep/GridReduceQueryExecutor.java | 8 +- .../messages/GridQueryCancelRequest.java | 49 ++++ .../cache/GridCacheCrossCacheQuerySelfTest.java | 38 ++- 5 files changed, 298 insertions(+), 73 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9424b9cd/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java index 8b09b16..ed0da58 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java @@ -32,10 +32,12 @@ import org.h2.result.*; import org.h2.value.*; import org.jdk8.backport.*; +import javax.cache.*; import java.lang.reflect.*; import java.sql.*; import java.util.*; import java.util.concurrent.*; +import java.util.concurrent.atomic.*; import static org.apache.ignite.events.EventType.*; @@ -95,9 +97,11 @@ public class GridMapQueryExecutor { boolean processed = true; if (msg instanceof GridQueryRequest) - executeLocalQuery(node, (GridQueryRequest)msg); + onQueryRequest(node, (GridQueryRequest)msg); else if (msg instanceof GridQueryNextPageRequest) - sendNextPage(node, (GridQueryNextPageRequest)msg); + onNextPageRequest(node, (GridQueryNextPageRequest)msg); + else if (msg instanceof GridQueryCancelRequest) + onCancel(node, (GridQueryCancelRequest)msg); else processed = false; @@ -115,33 +119,56 @@ public class GridMapQueryExecutor { /** * @param node Node. - * @param req Query request. + * @param msg Message. */ - private void executeLocalQuery(ClusterNode node, GridQueryRequest req) { - h2.setFilters(h2.backupFilter()); + private void onCancel(ClusterNode node, GridQueryCancelRequest msg) { + ConcurrentMap<Long,QueryResults> nodeRess = resultsForNode(node.id()); - try { - QueryResults qr = new QueryResults(req.requestId(), req.queries().size()); + QueryResults results = nodeRess.remove(msg.queryRequestId()); - ConcurrentMap<Long, QueryResults> nodeRess = qryRess.get(node.id()); + if (results == null) + return; - if (nodeRess == null) { - nodeRess = new ConcurrentHashMap8<>(); + results.cancel(); + } - ConcurrentMap<Long, QueryResults> old = qryRess.putIfAbsent(node.id(), nodeRess); + /** + * @param nodeId Node ID. + * @return Results for node. + */ + private ConcurrentMap<Long, QueryResults> resultsForNode(UUID nodeId) { + ConcurrentMap<Long, QueryResults> nodeRess = qryRess.get(nodeId); - if (old != null) - nodeRess = old; - } + if (nodeRess == null) { + nodeRess = new ConcurrentHashMap8<>(); - QueryResults old = nodeRess.putIfAbsent(req.requestId(), qr); + ConcurrentMap<Long, QueryResults> old = qryRess.putIfAbsent(nodeId, nodeRess); - assert old == null; + if (old != null) + nodeRess = old; + } - // Prepare snapshots for all the needed tables before actual run. - for (GridCacheSqlQuery qry : req.queries()) { - // TODO - } + return nodeRess; + } + + /** + * Executing queries locally. + * + * @param node Node. + * @param req Query request. + */ + private void onQueryRequest(ClusterNode node, GridQueryRequest req) { + ConcurrentMap<Long,QueryResults> nodeRess = resultsForNode(node.id()); + + QueryResults qr = new QueryResults(req.requestId(), req.queries().size()); + + if (nodeRess.putIfAbsent(req.requestId(), qr) != null) + throw new IllegalStateException(); + + h2.setFilters(h2.backupFilter()); + + try { + // TODO Prepare snapshots for all the needed tables before the run. // Run queries. int i = 0; @@ -152,13 +179,6 @@ public class GridMapQueryExecutor { ResultSet rs = h2.executeSqlQueryWithTimer(space, h2.connectionForSpace(space), qry.query(), F.asList(qry.parameters())); - assert rs instanceof JdbcResultSet : rs.getClass(); - - ResultInterface res = (ResultInterface)RESULT_FIELD.get(rs); - - qr.results[i] = res; - qr.resultSets[i] = rs; - if (ctx.event().isRecordable(EVT_CACHE_QUERY_EXECUTED)) { ctx.event().record(new CacheQueryExecutedEvent<>( node, @@ -175,13 +195,27 @@ public class GridMapQueryExecutor { null)); } + assert rs instanceof JdbcResultSet : rs.getClass(); + + qr.addResult(i, rs); + + if (qr.canceled) { + qr.result(i).close(); + + throw new IgniteException("Query was canceled."); + } + // Send the first page. - sendNextPage(node, qr, i, req.pageSize(), res.getRowCount()); + sendNextPage(nodeRess, node, qr, i, req.pageSize()); i++; } } catch (Throwable e) { + nodeRess.remove(req.requestId(), qr); + + qr.cancel(); + U.error(log, "Failed to execute local query: " + req, e); sendError(node, req.requestId(), e); @@ -212,16 +246,15 @@ public class GridMapQueryExecutor { * @param node Node. * @param req Request. */ - private void sendNextPage(ClusterNode node, GridQueryNextPageRequest req) { + private void onNextPageRequest(ClusterNode node, GridQueryNextPageRequest req) { ConcurrentMap<Long, QueryResults> nodeRess = qryRess.get(node.id()); QueryResults qr = nodeRess == null ? null : nodeRess.get(req.queryRequestId()); - if (qr == null) - sendError(node, req.queryRequestId(), - new IllegalStateException("No query result found for request: " + req)); + if (qr == null || qr.canceled) + sendError(node, req.queryRequestId(), new CacheException("No query result found for request: " + req)); else - sendNextPage(node, qr, req.query(), req.pageSize(), -1); + sendNextPage(nodeRess, node, qr, req.query(), req.pageSize()); } /** @@ -229,36 +262,29 @@ public class GridMapQueryExecutor { * @param qr Query results. * @param qry Query. * @param pageSize Page size. - * @param allRows All rows count. */ - private void sendNextPage(ClusterNode node, QueryResults qr, int qry, int pageSize, int allRows) { - int page; - - List<Value[]> rows = new ArrayList<>(Math.min(64, pageSize)); - - ResultInterface res = qr.results[qry]; + private void sendNextPage(ConcurrentMap<Long, QueryResults> nodeRess, ClusterNode node, QueryResults qr, int qry, + int pageSize) { + QueryResult res = qr.result(qry); assert res != null; - boolean last = false; + int page = res.page; - synchronized (res) { - page = qr.pages[qry]++; + List<Value[]> rows = new ArrayList<>(Math.min(64, pageSize)); - for (int i = 0 ; i < pageSize; i++) { - if (!res.next()) { - last = true; + boolean last = res.fetchNextPage(rows, pageSize); - break; - } + if (last) { + res.close(); - rows.add(res.currentRow()); - } + if (qr.isAllClosed()) + nodeRess.remove(qr.qryReqId, qr); } try { ctx.io().sendUserMessage(F.asList(node), - new GridQueryNextPageResponse(qr.qryReqId, qry, page, allRows, last, rows), + new GridQueryNextPageResponse(qr.qryReqId, qry, page, page == 0 ? res.rowCount : -1, last, rows), GridTopic.TOPIC_QUERY, false, 0); } catch (IgniteCheckedException e) { @@ -271,29 +297,146 @@ public class GridMapQueryExecutor { /** * */ - private static class QueryResults { - /** */ - private long qryReqId; - + private class QueryResults { /** */ - private ResultInterface[] results; + private final long qryReqId; /** */ - private ResultSet[] resultSets; + private final AtomicReferenceArray<QueryResult> results; /** */ - private int[] pages; + private volatile boolean canceled; /** * @param qryReqId Query request ID. - * @param qrys Queries. + * @param qrys Number of queries. */ private QueryResults(long qryReqId, int qrys) { this.qryReqId = qryReqId; - results = new ResultInterface[qrys]; - resultSets = new ResultSet[qrys]; - pages = new int[qrys]; + results = new AtomicReferenceArray<>(qrys); + } + + /** + * @param qry Query result index. + * @return Query result. + */ + QueryResult result(int qry) { + return results.get(qry); + } + + /** + * @param qry Query result index. + * @param rs Result set. + */ + void addResult(int qry, ResultSet rs) { + if (!results.compareAndSet(qry, null, new QueryResult(rs))) + throw new IllegalStateException(); + } + + /** + * @return {@code true} If all results are closed. + */ + boolean isAllClosed() { + for (int i = 0; i < results.length(); i++) { + QueryResult res = results.get(i); + + if (res == null || !res.closed) + return false; + } + + return true; + } + + void cancel() { + if (canceled) + return; + + canceled = true; + + for (int i = 0; i < results.length(); i++) { + QueryResult res = results.get(i); + + if (res != null) + res.close(); + } + } + } + + /** + * Result for a single part of the query. + */ + private class QueryResult implements AutoCloseable { + /** */ + private final ResultInterface res; + + /** */ + private final ResultSet rs; + + /** */ + private int page; + + /** */ + private final int rowCount; + + /** */ + private volatile boolean closed; + + /** + * @param rs Result set. + */ + private QueryResult(ResultSet rs) { + this.rs = rs; + + try { + res = (ResultInterface)RESULT_FIELD.get(rs); + } + catch (IllegalAccessException e) { + throw new IllegalStateException(e); // Must not happen. + } + + rowCount = res.getRowCount(); + } + + /** + * @param rows Collection to fetch into. + * @param pageSize Page size. + * @return {@code true} If there are no more rows available. + */ + synchronized boolean fetchNextPage(List<Value[]> rows, int pageSize) { + if (closed) + return true; + + page++; + + for (int i = 0 ; i < pageSize; i++) { + if (!res.next()) + return true; + + rows.add(res.currentRow()); + } + + return false; + } + + /** {@inheritDoc} */ + @Override public synchronized void close() { + if (closed) + return; + + closed = true; + + Statement stmt; + + try { + stmt = rs.getStatement(); + } + catch (SQLException e) { + throw new IllegalStateException(e); // Must not happen. + } + + U.close(rs, log); + U.close(stmt, log); } } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9424b9cd/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java index 71cc151..4632e0b 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java @@ -112,13 +112,20 @@ public abstract class GridMergeIndex extends BaseIndex { if (fetched == null) throw new IgniteException("Fetched result set was too large."); - if (fetched.size() == cnt.get()) // We've fetched all the rows. + if (fetchedAll()) return findAllFetched(fetched, first, last); return findInStream(first, last); } /** + * @return {@code true} If we have fetched all the remote rows. + */ + public boolean fetchedAll() { + return fetched.size() == cnt.get(); + } + + /** * @param first First row. * @param last Last row. * @return Cursor. Usually it must be {@link FetchingCursor} instance. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9424b9cd/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java index 9aec279..2804d90 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java @@ -164,7 +164,7 @@ public class GridReduceQueryExecutor { r.conn = h2.connectionForSpace(space); // TODO Add topology version. - Collection<ClusterNode> nodes = ctx.grid().cluster().forCacheNodes(space).nodes(); + final Collection<ClusterNode> nodes = ctx.grid().cluster().forCacheNodes(space).nodes(); for (GridCacheSqlQuery mapQry : qry.mapQueries()) { GridMergeTable tbl; @@ -198,8 +198,12 @@ public class GridReduceQueryExecutor { final ResultSet res = h2.executeSqlQueryWithTimer(space, r.conn, rdc.query(), F.asList(rdc.parameters())); - for (GridMergeTable tbl : r.tbls) + for (GridMergeTable tbl : r.tbls) { + if (!tbl.getScanIndex(null).fetchedAll()) // We have to explicitly cancel queries on remote nodes. + ctx.io().sendUserMessage(nodes, new GridQueryCancelRequest(qryReqId), GridTopic.TOPIC_QUERY, false, 0); + dropTable(r.conn, tbl.getName()); + } return new QueryCursorImpl<>(new Iter(res)); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9424b9cd/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryCancelRequest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryCancelRequest.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryCancelRequest.java new file mode 100644 index 0000000..9bcb410 --- /dev/null +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryCancelRequest.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.h2.twostep.messages; + +import org.apache.ignite.internal.util.typedef.internal.*; + +import java.io.*; + +/** + * Cancel request. + */ +public class GridQueryCancelRequest implements Serializable { + /** */ + private long qryReqId; + + /** + * @param qryReqId Query request ID. + */ + public GridQueryCancelRequest(long qryReqId) { + this.qryReqId = qryReqId; + } + + /** + * @return Query request ID. + */ + public long queryRequestId() { + return qryReqId; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridQueryCancelRequest.class, this); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9424b9cd/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheCrossCacheQuerySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheCrossCacheQuerySelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheCrossCacheQuerySelfTest.java index 8445912..2c38f4e 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheCrossCacheQuerySelfTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheCrossCacheQuerySelfTest.java @@ -70,6 +70,8 @@ public class GridCacheCrossCacheQuerySelfTest extends GridCommonAbstractTest { /** {@inheritDoc} */ @Override protected void beforeTest() throws Exception { ignite = startGridsMultiThreaded(3); + + fillCaches(); } /** {@inheritDoc} */ @@ -117,8 +119,6 @@ public class GridCacheCrossCacheQuerySelfTest extends GridCommonAbstractTest { * @throws Exception If failed. */ public void testTwoStep() throws Exception { - fillCaches(); - String cache = "partitioned"; GridCacheQueriesEx<Integer, FactPurchase> qx = @@ -141,8 +141,6 @@ public class GridCacheCrossCacheQuerySelfTest extends GridCommonAbstractTest { * @throws Exception If failed. */ public void testTwoStepGroupAndAggregates() throws Exception { - fillCaches(); - GridCacheQueriesEx<Integer, FactPurchase> qx = (GridCacheQueriesEx<Integer, FactPurchase>)((IgniteKernal)ignite) .<Integer, FactPurchase>cache("partitioned").queries(); @@ -235,8 +233,6 @@ public class GridCacheCrossCacheQuerySelfTest extends GridCommonAbstractTest { * @throws Exception If failed. */ public void testApiQueries() throws Exception { - fillCaches(); - IgniteCache<Object,Object> c = ignite.jcache("partitioned"); c.queryFields(new SqlFieldsQuery("select cast(? as varchar) from FactPurchase").setArgs("aaa")).getAll(); @@ -249,6 +245,34 @@ public class GridCacheCrossCacheQuerySelfTest extends GridCommonAbstractTest { assertEquals(8, res.get(0).get(1)); } + public void _testLoop() throws InterruptedException { + IgniteCache<Object,Object> c = ignite.jcache("partitioned"); + + X.println("___ GET READY"); + + Thread.sleep(20000); + + X.println("___ GO"); + + long start = System.currentTimeMillis(); + + for (int i = 0; i < 1000000; i++) { + if (i % 1000 == 0) { + long t = System.currentTimeMillis(); + + X.println("__ " + i + " -> " + (t - start)); + + start = t; + } + + c.queryFields(new SqlFieldsQuery("select * from FactPurchase")).getAll(); + } + + X.println("___ OK"); + + Thread.sleep(300000); + } + /** * @param l List. * @param idx Index. @@ -260,8 +284,6 @@ public class GridCacheCrossCacheQuerySelfTest extends GridCommonAbstractTest { /** @throws Exception If failed. */ public void testOnProjection() throws Exception { - fillCaches(); - CacheProjection<Integer, FactPurchase> prj = ((IgniteKernal)ignite) .<Integer, FactPurchase>cache("partitioned").projection( new IgnitePredicate<Cache.Entry<Integer, FactPurchase>>() {