ignite-sql-tests - leak fixes

Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/9424b9cd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/9424b9cd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/9424b9cd

Branch: refs/heads/ignite-sql-tests
Commit: 9424b9cd3e9373034d3fde4218716fa7fe9c386e
Parents: 4e3b4cb
Author: S.Vladykin <svlady...@gridgain.com>
Authored: Wed Mar 4 00:28:33 2015 +0300
Committer: S.Vladykin <svlady...@gridgain.com>
Committed: Wed Mar 4 00:28:33 2015 +0300

----------------------------------------------------------------------
 .../query/h2/twostep/GridMapQueryExecutor.java  | 267 ++++++++++++++-----
 .../query/h2/twostep/GridMergeIndex.java        |   9 +-
 .../h2/twostep/GridReduceQueryExecutor.java     |   8 +-
 .../messages/GridQueryCancelRequest.java        |  49 ++++
 .../cache/GridCacheCrossCacheQuerySelfTest.java |  38 ++-
 5 files changed, 298 insertions(+), 73 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9424b9cd/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
index 8b09b16..ed0da58 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
@@ -32,10 +32,12 @@ import org.h2.result.*;
 import org.h2.value.*;
 import org.jdk8.backport.*;
 
+import javax.cache.*;
 import java.lang.reflect.*;
 import java.sql.*;
 import java.util.*;
 import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
 
 import static org.apache.ignite.events.EventType.*;
 
@@ -95,9 +97,11 @@ public class GridMapQueryExecutor {
                     boolean processed = true;
 
                     if (msg instanceof GridQueryRequest)
-                        executeLocalQuery(node, (GridQueryRequest)msg);
+                        onQueryRequest(node, (GridQueryRequest)msg);
                     else if (msg instanceof GridQueryNextPageRequest)
-                        sendNextPage(node, (GridQueryNextPageRequest)msg);
+                        onNextPageRequest(node, (GridQueryNextPageRequest)msg);
+                    else if (msg instanceof GridQueryCancelRequest)
+                        onCancel(node, (GridQueryCancelRequest)msg);
                     else
                         processed = false;
 
@@ -115,33 +119,56 @@ public class GridMapQueryExecutor {
 
     /**
      * @param node Node.
-     * @param req Query request.
+     * @param msg Message.
      */
-    private void executeLocalQuery(ClusterNode node, GridQueryRequest req) {
-        h2.setFilters(h2.backupFilter());
+    private void onCancel(ClusterNode node, GridQueryCancelRequest msg) {
+        ConcurrentMap<Long,QueryResults> nodeRess = resultsForNode(node.id());
 
-        try {
-            QueryResults qr = new QueryResults(req.requestId(), 
req.queries().size());
+        QueryResults results = nodeRess.remove(msg.queryRequestId());
 
-            ConcurrentMap<Long, QueryResults> nodeRess = 
qryRess.get(node.id());
+        if (results == null)
+            return;
 
-            if (nodeRess == null) {
-                nodeRess = new ConcurrentHashMap8<>();
+        results.cancel();
+    }
 
-                ConcurrentMap<Long, QueryResults> old = 
qryRess.putIfAbsent(node.id(), nodeRess);
+    /**
+     * @param nodeId Node ID.
+     * @return Results for node.
+     */
+    private ConcurrentMap<Long, QueryResults> resultsForNode(UUID nodeId) {
+        ConcurrentMap<Long, QueryResults> nodeRess = qryRess.get(nodeId);
 
-                if (old != null)
-                    nodeRess = old;
-            }
+        if (nodeRess == null) {
+            nodeRess = new ConcurrentHashMap8<>();
 
-            QueryResults old = nodeRess.putIfAbsent(req.requestId(), qr);
+            ConcurrentMap<Long, QueryResults> old = 
qryRess.putIfAbsent(nodeId, nodeRess);
 
-            assert old == null;
+            if (old != null)
+                nodeRess = old;
+        }
 
-            // Prepare snapshots for all the needed tables before actual run.
-            for (GridCacheSqlQuery qry : req.queries()) {
-                // TODO
-            }
+        return nodeRess;
+    }
+
+    /**
+     * Executing queries locally.
+     *
+     * @param node Node.
+     * @param req Query request.
+     */
+    private void onQueryRequest(ClusterNode node, GridQueryRequest req) {
+        ConcurrentMap<Long,QueryResults> nodeRess = resultsForNode(node.id());
+
+        QueryResults qr = new QueryResults(req.requestId(), 
req.queries().size());
+
+        if (nodeRess.putIfAbsent(req.requestId(), qr) != null)
+            throw new IllegalStateException();
+
+        h2.setFilters(h2.backupFilter());
+
+        try {
+            // TODO Prepare snapshots for all the needed tables before the run.
 
             // Run queries.
             int i = 0;
@@ -152,13 +179,6 @@ public class GridMapQueryExecutor {
                 ResultSet rs = h2.executeSqlQueryWithTimer(space, 
h2.connectionForSpace(space), qry.query(),
                     F.asList(qry.parameters()));
 
-                assert rs instanceof JdbcResultSet : rs.getClass();
-
-                ResultInterface res = (ResultInterface)RESULT_FIELD.get(rs);
-
-                qr.results[i] = res;
-                qr.resultSets[i] = rs;
-
                 if (ctx.event().isRecordable(EVT_CACHE_QUERY_EXECUTED)) {
                     ctx.event().record(new CacheQueryExecutedEvent<>(
                         node,
@@ -175,13 +195,27 @@ public class GridMapQueryExecutor {
                         null));
                 }
 
+                assert rs instanceof JdbcResultSet : rs.getClass();
+
+                qr.addResult(i, rs);
+
+                if (qr.canceled) {
+                    qr.result(i).close();
+
+                    throw new IgniteException("Query was canceled.");
+                }
+
                 // Send the first page.
-                sendNextPage(node, qr, i, req.pageSize(), res.getRowCount());
+                sendNextPage(nodeRess, node, qr, i, req.pageSize());
 
                 i++;
             }
         }
         catch (Throwable e) {
+            nodeRess.remove(req.requestId(), qr);
+
+            qr.cancel();
+
             U.error(log, "Failed to execute local query: " + req, e);
 
             sendError(node, req.requestId(), e);
@@ -212,16 +246,15 @@ public class GridMapQueryExecutor {
      * @param node Node.
      * @param req Request.
      */
-    private void sendNextPage(ClusterNode node, GridQueryNextPageRequest req) {
+    private void onNextPageRequest(ClusterNode node, GridQueryNextPageRequest 
req) {
         ConcurrentMap<Long, QueryResults> nodeRess = qryRess.get(node.id());
 
         QueryResults qr = nodeRess == null ? null : 
nodeRess.get(req.queryRequestId());
 
-        if (qr == null)
-            sendError(node, req.queryRequestId(),
-                new IllegalStateException("No query result found for request: 
" + req));
+        if (qr == null || qr.canceled)
+            sendError(node, req.queryRequestId(), new CacheException("No query 
result found for request: " + req));
         else
-            sendNextPage(node, qr, req.query(), req.pageSize(), -1);
+            sendNextPage(nodeRess, node, qr, req.query(), req.pageSize());
     }
 
     /**
@@ -229,36 +262,29 @@ public class GridMapQueryExecutor {
      * @param qr Query results.
      * @param qry Query.
      * @param pageSize Page size.
-     * @param allRows All rows count.
      */
-    private void sendNextPage(ClusterNode node, QueryResults qr, int qry, int 
pageSize, int allRows) {
-        int page;
-
-        List<Value[]> rows = new ArrayList<>(Math.min(64, pageSize));
-
-        ResultInterface res = qr.results[qry];
+    private void sendNextPage(ConcurrentMap<Long, QueryResults> nodeRess, 
ClusterNode node, QueryResults qr, int qry,
+        int pageSize) {
+        QueryResult res = qr.result(qry);
 
         assert res != null;
 
-        boolean last = false;
+        int page = res.page;
 
-        synchronized (res) {
-            page = qr.pages[qry]++;
+        List<Value[]> rows = new ArrayList<>(Math.min(64, pageSize));
 
-            for (int i = 0 ; i < pageSize; i++) {
-                if (!res.next()) {
-                    last = true;
+        boolean last = res.fetchNextPage(rows, pageSize);
 
-                    break;
-                }
+        if (last) {
+            res.close();
 
-                rows.add(res.currentRow());
-            }
+            if (qr.isAllClosed())
+                nodeRess.remove(qr.qryReqId, qr);
         }
 
         try {
             ctx.io().sendUserMessage(F.asList(node),
-                new GridQueryNextPageResponse(qr.qryReqId, qry, page, allRows, 
last, rows),
+                new GridQueryNextPageResponse(qr.qryReqId, qry, page, page == 
0 ? res.rowCount : -1, last, rows),
                 GridTopic.TOPIC_QUERY, false, 0);
         }
         catch (IgniteCheckedException e) {
@@ -271,29 +297,146 @@ public class GridMapQueryExecutor {
     /**
      *
      */
-    private static class QueryResults {
-        /** */
-        private long qryReqId;
-
+    private class QueryResults {
         /** */
-        private ResultInterface[] results;
+        private final long qryReqId;
 
         /** */
-        private ResultSet[] resultSets;
+        private final AtomicReferenceArray<QueryResult> results;
 
         /** */
-        private int[] pages;
+        private volatile boolean canceled;
 
         /**
          * @param qryReqId Query request ID.
-         * @param qrys Queries.
+         * @param qrys Number of queries.
          */
         private QueryResults(long qryReqId, int qrys) {
             this.qryReqId = qryReqId;
 
-            results = new ResultInterface[qrys];
-            resultSets = new ResultSet[qrys];
-            pages = new int[qrys];
+            results = new AtomicReferenceArray<>(qrys);
+        }
+
+        /**
+         * @param qry Query result index.
+         * @return Query result.
+         */
+        QueryResult result(int qry) {
+            return results.get(qry);
+        }
+
+        /**
+         * @param qry Query result index.
+         * @param rs Result set.
+         */
+        void addResult(int qry, ResultSet rs) {
+            if (!results.compareAndSet(qry, null, new QueryResult(rs)))
+                throw new IllegalStateException();
+        }
+
+        /**
+         * @return {@code true} If all results are closed.
+         */
+        boolean isAllClosed() {
+            for (int i = 0; i < results.length(); i++) {
+                QueryResult res = results.get(i);
+
+                if (res == null || !res.closed)
+                    return false;
+            }
+
+            return true;
+        }
+
+        void cancel() {
+            if (canceled)
+                return;
+
+            canceled = true;
+
+            for (int i = 0; i < results.length(); i++) {
+                QueryResult res = results.get(i);
+
+                if (res != null)
+                    res.close();
+            }
+        }
+    }
+
+    /**
+     * Result for a single part of the query.
+     */
+    private class QueryResult implements AutoCloseable {
+        /** */
+        private final ResultInterface res;
+
+        /** */
+        private final ResultSet rs;
+
+        /** */
+        private int page;
+
+        /** */
+        private final int rowCount;
+
+        /** */
+        private volatile boolean closed;
+
+        /**
+         * @param rs Result set.
+         */
+        private QueryResult(ResultSet rs) {
+            this.rs = rs;
+
+            try {
+                res = (ResultInterface)RESULT_FIELD.get(rs);
+            }
+            catch (IllegalAccessException e) {
+                throw new IllegalStateException(e); // Must not happen.
+            }
+
+            rowCount = res.getRowCount();
+        }
+
+        /**
+         * @param rows Collection to fetch into.
+         * @param pageSize Page size.
+         * @return {@code true} If there are no more rows available.
+         */
+        synchronized boolean fetchNextPage(List<Value[]> rows, int pageSize) {
+            if (closed)
+                return true;
+
+            page++;
+
+            for (int i = 0 ; i < pageSize; i++) {
+                if (!res.next())
+                    return true;
+
+                rows.add(res.currentRow());
+            }
+
+            return false;
+        }
+
+        /** {@inheritDoc} */
+        @Override public synchronized void close() {
+            if (closed)
+                return;
+
+            closed = true;
+
+            Statement stmt;
+
+            try {
+                stmt = rs.getStatement();
+            }
+            catch (SQLException e) {
+                throw new IllegalStateException(e); // Must not happen.
+            }
+
+            U.close(rs, log);
+            U.close(stmt, log);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9424b9cd/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java
index 71cc151..4632e0b 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java
@@ -112,13 +112,20 @@ public abstract class GridMergeIndex extends BaseIndex {
         if (fetched == null)
             throw new IgniteException("Fetched result set was too large.");
 
-        if (fetched.size() == cnt.get())  // We've fetched all the rows.
+        if (fetchedAll())
             return findAllFetched(fetched, first, last);
 
         return findInStream(first, last);
     }
 
     /**
+     * @return {@code true} If we have fetched all the remote rows.
+     */
+    public boolean fetchedAll() {
+        return fetched.size() == cnt.get();
+    }
+
+    /**
      * @param first First row.
      * @param last Last row.
      * @return Cursor. Usually it must be {@link FetchingCursor} instance.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9424b9cd/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index 9aec279..2804d90 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -164,7 +164,7 @@ public class GridReduceQueryExecutor {
         r.conn = h2.connectionForSpace(space);
 
         // TODO Add topology version.
-        Collection<ClusterNode> nodes = 
ctx.grid().cluster().forCacheNodes(space).nodes();
+        final Collection<ClusterNode> nodes = 
ctx.grid().cluster().forCacheNodes(space).nodes();
 
         for (GridCacheSqlQuery mapQry : qry.mapQueries()) {
             GridMergeTable tbl;
@@ -198,8 +198,12 @@ public class GridReduceQueryExecutor {
 
             final ResultSet res = h2.executeSqlQueryWithTimer(space, r.conn, 
rdc.query(), F.asList(rdc.parameters()));
 
-            for (GridMergeTable tbl : r.tbls)
+            for (GridMergeTable tbl : r.tbls) {
+                if (!tbl.getScanIndex(null).fetchedAll()) // We have to 
explicitly cancel queries on remote nodes.
+                    ctx.io().sendUserMessage(nodes, new 
GridQueryCancelRequest(qryReqId), GridTopic.TOPIC_QUERY, false, 0);
+
                 dropTable(r.conn, tbl.getName());
+            }
 
             return new QueryCursorImpl<>(new Iter(res));
         }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9424b9cd/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryCancelRequest.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryCancelRequest.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryCancelRequest.java
new file mode 100644
index 0000000..9bcb410
--- /dev/null
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryCancelRequest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.twostep.messages;
+
+import org.apache.ignite.internal.util.typedef.internal.*;
+
+import java.io.*;
+
+/**
+ * Cancel request.
+ */
+public class GridQueryCancelRequest implements Serializable {
+    /** */
+    private long qryReqId;
+
+    /**
+     * @param qryReqId Query request ID.
+     */
+    public GridQueryCancelRequest(long qryReqId) {
+        this.qryReqId = qryReqId;
+    }
+
+    /**
+     * @return Query request ID.
+     */
+    public long queryRequestId() {
+        return qryReqId;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(GridQueryCancelRequest.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9424b9cd/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheCrossCacheQuerySelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheCrossCacheQuerySelfTest.java
 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheCrossCacheQuerySelfTest.java
index 8445912..2c38f4e 100644
--- 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheCrossCacheQuerySelfTest.java
+++ 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheCrossCacheQuerySelfTest.java
@@ -70,6 +70,8 @@ public class GridCacheCrossCacheQuerySelfTest extends 
GridCommonAbstractTest {
     /** {@inheritDoc} */
     @Override protected void beforeTest() throws Exception {
         ignite = startGridsMultiThreaded(3);
+
+        fillCaches();
     }
 
     /** {@inheritDoc} */
@@ -117,8 +119,6 @@ public class GridCacheCrossCacheQuerySelfTest extends 
GridCommonAbstractTest {
      * @throws Exception If failed.
      */
     public void testTwoStep() throws Exception {
-        fillCaches();
-
         String cache = "partitioned";
 
         GridCacheQueriesEx<Integer, FactPurchase> qx =
@@ -141,8 +141,6 @@ public class GridCacheCrossCacheQuerySelfTest extends 
GridCommonAbstractTest {
      * @throws Exception If failed.
      */
     public void testTwoStepGroupAndAggregates() throws Exception {
-        fillCaches();
-
         GridCacheQueriesEx<Integer, FactPurchase> qx =
             (GridCacheQueriesEx<Integer, FactPurchase>)((IgniteKernal)ignite)
                 .<Integer, FactPurchase>cache("partitioned").queries();
@@ -235,8 +233,6 @@ public class GridCacheCrossCacheQuerySelfTest extends 
GridCommonAbstractTest {
      * @throws Exception If failed.
      */
     public void testApiQueries() throws Exception {
-        fillCaches();
-
         IgniteCache<Object,Object> c = ignite.jcache("partitioned");
 
         c.queryFields(new SqlFieldsQuery("select cast(? as varchar) from 
FactPurchase").setArgs("aaa")).getAll();
@@ -249,6 +245,34 @@ public class GridCacheCrossCacheQuerySelfTest extends 
GridCommonAbstractTest {
         assertEquals(8, res.get(0).get(1));
     }
 
+    public void _testLoop() throws InterruptedException {
+        IgniteCache<Object,Object> c = ignite.jcache("partitioned");
+
+        X.println("___ GET READY");
+
+        Thread.sleep(20000);
+
+        X.println("___ GO");
+
+        long start = System.currentTimeMillis();
+
+        for (int i = 0; i < 1000000; i++) {
+            if (i % 1000 == 0) {
+                long t = System.currentTimeMillis();
+
+                X.println("__ " + i + " -> " + (t - start));
+
+                start = t;
+            }
+
+            c.queryFields(new SqlFieldsQuery("select * from 
FactPurchase")).getAll();
+        }
+
+        X.println("___ OK");
+
+        Thread.sleep(300000);
+    }
+
     /**
      * @param l List.
      * @param idx Index.
@@ -260,8 +284,6 @@ public class GridCacheCrossCacheQuerySelfTest extends 
GridCommonAbstractTest {
 
     /** @throws Exception If failed. */
     public void testOnProjection() throws Exception {
-        fillCaches();
-
         CacheProjection<Integer, FactPurchase> prj = ((IgniteKernal)ignite)
             .<Integer, FactPurchase>cache("partitioned").projection(
             new IgnitePredicate<Cache.Entry<Integer, FactPurchase>>() {

Reply via email to