# IGNITE-692 Removed not needed holder wrapper. Added missing query cursor close.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/f475b07e Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/f475b07e Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/f475b07e Branch: refs/heads/ignite-718 Commit: f475b07e3e248b304b40af427aa8cbcdaf2f6eb2 Parents: e101752 Author: AKuznetsov <akuznet...@gridgain.com> Authored: Fri Apr 10 19:19:56 2015 +0700 Committer: AKuznetsov <akuznet...@gridgain.com> Committed: Fri Apr 10 19:19:56 2015 +0700 ---------------------------------------------------------------------- .../visor/query/VisorQueryCleanupTask.java | 10 ++- .../internal/visor/query/VisorQueryCursor.java | 26 +++++++- .../visor/query/VisorQueryCursorHolder.java | 64 -------------------- .../internal/visor/query/VisorQueryJob.java | 52 ++++++++++------ .../visor/query/VisorQueryNextPageTask.java | 38 +++++++----- .../resources/META-INF/classnames.properties | 1 - 6 files changed, 87 insertions(+), 104 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f475b07e/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryCleanupTask.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryCleanupTask.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryCleanupTask.java index d82f5fb..722ad91 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryCleanupTask.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryCleanupTask.java @@ -86,10 +86,14 @@ public class VisorQueryCleanupTask extends VisorMultiNodeTask<Map<UUID, Collecti /** {@inheritDoc} */ @Override protected Void run(Collection<String> qryIds) { - ConcurrentMap<String, VisorQueryCursorHolder> locMap = ignite.cluster().nodeLocalMap(); + ConcurrentMap<String, VisorQueryCursor> storage = ignite.cluster().nodeLocalMap(); - for (String qryId : qryIds) - locMap.remove(qryId); + for (String qryId : qryIds) { + VisorQueryCursor cur = storage.remove(qryId); + + if (cur != null) + cur.close(); + } return null; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f475b07e/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryCursor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryCursor.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryCursor.java index 981b297..322b3b4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryCursor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryCursor.java @@ -20,6 +20,7 @@ package org.apache.ignite.internal.visor.query; import org.apache.ignite.cache.query.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.query.*; +import org.apache.ignite.internal.util.typedef.internal.*; import java.util.*; @@ -33,6 +34,9 @@ public class VisorQueryCursor<T> implements Iterator<T>, AutoCloseable { /** */ private final Iterator<T> itr; + /** Flag indicating that this cursor was read from last check. */ + private volatile boolean accessed; + /** * @param cur Cursor. */ @@ -58,7 +62,7 @@ public class VisorQueryCursor<T> implements Iterator<T>, AutoCloseable { } /** {@inheritDoc} */ - @Override public void close() throws Exception { + @Override public void close() { cur.close(); } @@ -69,4 +73,24 @@ public class VisorQueryCursor<T> implements Iterator<T>, AutoCloseable { public Collection<GridQueryFieldMetadata> fieldsMeta() { return ((QueryCursorImpl)cur).fieldsMeta(); } + + /** + * @return Flag indicating that this future was read from last check.. + */ + public boolean accessed() { + return accessed; + } + + /** + * @param accessed New accessed. + */ + public void accessed(boolean accessed) { + this.accessed = accessed; + } + + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(VisorQueryCursor.class, this); + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f475b07e/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryCursorHolder.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryCursorHolder.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryCursorHolder.java deleted file mode 100644 index dec0253..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryCursorHolder.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.visor.query; - -import java.io.*; - -/** - * ResultSet future holder. - */ -public class VisorQueryCursorHolder<T> implements Serializable { - /** */ - private static final long serialVersionUID = 0L; - - /** Query cursor. */ - private final VisorQueryCursor<T> cur; - - /** Flag indicating that this future was read from last check. */ - private volatile boolean accessed; - - /** - * @param cur Future. - * @param accessed {@code true} if query was accessed before remove timeout expired. - */ - public VisorQueryCursorHolder(VisorQueryCursor<T> cur, boolean accessed) { - this.cur = cur; - this.accessed = accessed; - } - - /** - * @return Query cursor. - */ - public VisorQueryCursor<T> cursor() { - return cur; - } - - /** - * @return Flag indicating that this future was read from last check.. - */ - public boolean accessed() { - return accessed; - } - - /** - * @param accessed New accessed. - */ - public void accessed(boolean accessed) { - this.accessed = accessed; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f475b07e/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryJob.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryJob.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryJob.java index 2d8b32e..dcc2242 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryJob.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryJob.java @@ -82,13 +82,18 @@ public class VisorQueryJob extends VisorJob<VisorQueryArg, IgniteBiTuple<? exten long duration = U.currentTimeMillis() - start; // Scan duration + fetch duration. - ignite.cluster().<String, VisorQueryCursorHolder>nodeLocalMap().put(qryId, - new VisorQueryCursorHolder<>(cur, false)); + boolean hasNext = cur.hasNext(); - scheduleResultSetHolderRemoval(qryId); + if (hasNext) { + ignite.cluster().<String, VisorQueryCursor>nodeLocalMap().put(qryId, cur); + + scheduleResultSetHolderRemoval(qryId); + } + else + cur.close(); return new IgniteBiTuple<>(null, new VisorQueryResultEx(ignite.localNode().id(), qryId, - SCAN_COL_NAMES, rows, cur.hasNext(), duration)); + SCAN_COL_NAMES, rows, hasNext, duration)); } else { SqlFieldsQuery qry = new SqlFieldsQuery(arg.queryTxt()); @@ -114,13 +119,18 @@ public class VisorQueryJob extends VisorJob<VisorQueryArg, IgniteBiTuple<? exten long duration = U.currentTimeMillis() - start; // Query duration + fetch duration. - ConcurrentMap<String, VisorQueryCursorHolder<List<?>>> storage = ignite.cluster().nodeLocalMap(); - storage.put(qryId, new VisorQueryCursorHolder<>(cur, false)); + boolean hasNext = cur.hasNext(); - scheduleResultSetHolderRemoval(qryId); + if (hasNext) { + ignite.cluster().<String, VisorQueryCursor<List<?>>>nodeLocalMap().put(qryId, cur); + + scheduleResultSetHolderRemoval(qryId); + } + else + cur.close(); return new IgniteBiTuple<>(null, new VisorQueryResultEx(ignite.localNode().id(), qryId, - names, rows, cur.hasNext(), duration)); + names, rows, hasNext, duration)); } } } @@ -130,24 +140,28 @@ public class VisorQueryJob extends VisorJob<VisorQueryArg, IgniteBiTuple<? exten } /** - * @param id Unique query result id. + * @param qryId Unique query result id. */ - private void scheduleResultSetHolderRemoval(final String id) { + private void scheduleResultSetHolderRemoval(final String qryId) { ignite.context().timeout().addTimeoutObject(new GridTimeoutObjectAdapter(RMV_DELAY) { @Override public void onTimeout() { - ConcurrentMap<String, VisorQueryCursorHolder> storage = ignite.cluster().nodeLocalMap(); + ConcurrentMap<String, VisorQueryCursor> storage = ignite.cluster().nodeLocalMap(); - VisorQueryCursorHolder holder = storage.get(id); + VisorQueryCursor cur = storage.get(qryId); - if (holder != null) { - // If future was accessed since last scheduling, set access flag to false and reschedule. - if (holder.accessed()) { - holder.accessed(false); + if (cur != null) { + // If cursor was accessed since last scheduling, set access flag to false and reschedule. + if (cur.accessed()) { + cur.accessed(false); - scheduleResultSetHolderRemoval(id); + scheduleResultSetHolderRemoval(qryId); + } + else { + // Remove stored cursor otherwise. + storage.remove(qryId); + + cur.close(); } - else - storage.remove(id); // Remove stored future otherwise. } } }); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f475b07e/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryNextPageTask.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryNextPageTask.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryNextPageTask.java index 191f44d..a3ff089 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryNextPageTask.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryNextPageTask.java @@ -71,23 +71,26 @@ public class VisorQueryNextPageTask extends VisorOneNodeTask<IgniteBiTuple<Strin private VisorQueryResult nextSqlPage(IgniteBiTuple<String, Integer> arg) { long start = U.currentTimeMillis(); - ConcurrentMap<String, VisorQueryCursorHolder<List<?>>> storage = ignite.cluster().nodeLocalMap(); + ConcurrentMap<String, VisorQueryCursor<List<?>>> storage = ignite.cluster().nodeLocalMap(); - VisorQueryCursorHolder<List<?>> holder = storage.get(arg.get1()); + String qryId = arg.get1(); - if (holder == null) - throw new IgniteException("SQL query results are expired."); + VisorQueryCursor<List<?>> cur = storage.get(qryId); - VisorQueryCursor<List<?>> cur = holder.cursor(); + if (cur == null) + throw new IgniteException("SQL query results are expired."); List<Object[]> nextRows = VisorQueryUtils.fetchSqlQueryRows(cur, arg.get2()); boolean hasMore = cur.hasNext(); if (hasMore) - holder.accessed(true); - else - storage.remove(arg.get1()); + cur.accessed(true); + else { + storage.remove(qryId); + + cur.close(); + } return new VisorQueryResult(nextRows, hasMore, U.currentTimeMillis() - start); } @@ -101,23 +104,26 @@ public class VisorQueryNextPageTask extends VisorOneNodeTask<IgniteBiTuple<Strin private VisorQueryResult nextScanPage(IgniteBiTuple<String, Integer> arg) { long start = U.currentTimeMillis(); - ConcurrentMap<String, VisorQueryCursorHolder<Cache.Entry<Object, Object>>> storage = ignite.cluster().nodeLocalMap(); + ConcurrentMap<String, VisorQueryCursor<Cache.Entry<Object, Object>>> storage = ignite.cluster().nodeLocalMap(); - VisorQueryCursorHolder<Cache.Entry<Object, Object>> holder = storage.get(arg.get1()); + String qryId = arg.get1(); - if (holder == null) - throw new IgniteException("Scan query results are expired."); + VisorQueryCursor<Cache.Entry<Object, Object>> cur = storage.get(qryId); - VisorQueryCursor<Cache.Entry<Object, Object>> cur = holder.cursor(); + if (cur == null) + throw new IgniteException("Scan query results are expired."); List<Object[]> rows = VisorQueryUtils.fetchScanQueryRows(cur, arg.get2()); boolean hasMore = cur.hasNext(); if (hasMore) - holder.accessed(true); - else - storage.remove(arg.get1()); + cur.accessed(true); + else { + storage.remove(qryId); + + cur.close(); + } return new VisorQueryResult(rows, hasMore, U.currentTimeMillis() - start); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f475b07e/modules/core/src/main/resources/META-INF/classnames.properties ---------------------------------------------------------------------- diff --git a/modules/core/src/main/resources/META-INF/classnames.properties b/modules/core/src/main/resources/META-INF/classnames.properties index 58d3ad3..f0a4052 100644 --- a/modules/core/src/main/resources/META-INF/classnames.properties +++ b/modules/core/src/main/resources/META-INF/classnames.properties @@ -1488,7 +1488,6 @@ org.apache.ignite.internal.visor.node.VisorTransactionConfiguration org.apache.ignite.internal.visor.query.VisorQueryArg org.apache.ignite.internal.visor.query.VisorQueryCleanupTask org.apache.ignite.internal.visor.query.VisorQueryCleanupTask$VisorQueryCleanupJob -org.apache.ignite.internal.visor.query.VisorQueryCursorHolder org.apache.ignite.internal.visor.query.VisorQueryField org.apache.ignite.internal.visor.query.VisorQueryJob org.apache.ignite.internal.visor.query.VisorQueryNextPageTask