ignite-750 - fix
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/235843e6 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/235843e6 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/235843e6 Branch: refs/heads/ignite-656 Commit: 235843e69f2f1f27458714bc7632cc7d9edfbbaf Parents: 3fdc824 Author: S.Vladykin <[email protected]> Authored: Wed Apr 15 17:13:25 2015 +0300 Committer: S.Vladykin <[email protected]> Committed: Wed Apr 15 17:13:25 2015 +0300 ---------------------------------------------------------------------- .../processors/query/GridQueryIndexing.java | 4 ++- .../processors/query/GridQueryProcessor.java | 2 +- .../processors/query/h2/IgniteH2Indexing.java | 6 ++-- .../query/h2/twostep/GridMapQueryExecutor.java | 34 +++++++++++++++++--- .../h2/twostep/GridReduceQueryExecutor.java | 34 +++++++++++++++++--- 5 files changed, 67 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/235843e6/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java index 12f774c..fe029eb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java @@ -23,6 +23,7 @@ import org.apache.ignite.configuration.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.query.*; +import org.apache.ignite.internal.util.*; import org.apache.ignite.internal.util.lang.*; import org.apache.ignite.lang.*; import org.apache.ignite.spi.indexing.*; @@ -39,9 +40,10 @@ public interface GridQueryIndexing { * Starts indexing. * * @param ctx Context. + * @param busyLock Busy lock. * @throws IgniteCheckedException If failed. */ - public void start(GridKernalContext ctx) throws IgniteCheckedException; + public void start(GridKernalContext ctx, GridSpinBusyLock busyLock) throws IgniteCheckedException; /** * Stops indexing. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/235843e6/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java index 7ce894d..35e8d73 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java @@ -94,7 +94,7 @@ public class GridQueryProcessor extends GridProcessorAdapter { execSvc = ctx.getExecutorService(); - idx.start(ctx); + idx.start(ctx, busyLock); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/235843e6/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java index 7ec1dbe..b05c908 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java @@ -1109,7 +1109,7 @@ public class IgniteH2Indexing implements GridQueryIndexing { /** {@inheritDoc} */ @SuppressWarnings("NonThreadSafeLazyInitialization") - @Override public void start(GridKernalContext ctx) throws IgniteCheckedException { + @Override public void start(GridKernalContext ctx, GridSpinBusyLock busyLock) throws IgniteCheckedException { if (log.isDebugEnabled()) log.debug("Starting cache query index..."); @@ -1161,8 +1161,8 @@ public class IgniteH2Indexing implements GridQueryIndexing { nodeId = ctx.localNodeId(); marshaller = ctx.config().getMarshaller(); - mapQryExec = new GridMapQueryExecutor(); - rdcQryExec = new GridReduceQueryExecutor(); + mapQryExec = new GridMapQueryExecutor(busyLock); + rdcQryExec = new GridReduceQueryExecutor(busyLock); mapQryExec.start(ctx, this); rdcQryExec.start(ctx, this); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/235843e6/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java index 08ad38d..747ccb1 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java @@ -27,6 +27,7 @@ import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.query.*; import org.apache.ignite.internal.processors.query.h2.*; import org.apache.ignite.internal.processors.query.h2.twostep.messages.*; +import org.apache.ignite.internal.util.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.h2.jdbc.*; @@ -47,7 +48,7 @@ import static org.apache.ignite.events.EventType.*; /** * Map query executor. */ -public class GridMapQueryExecutor implements GridMessageListener { +public class GridMapQueryExecutor { /** */ private static final Field RESULT_FIELD; @@ -77,6 +78,16 @@ public class GridMapQueryExecutor implements GridMessageListener { /** */ private ConcurrentMap<UUID, ConcurrentMap<Long, QueryResults>> qryRess = new ConcurrentHashMap8<>(); + /** */ + private final GridSpinBusyLock busyLock; + + /** + * @param busyLock Busy lock. + */ + public GridMapQueryExecutor(GridSpinBusyLock busyLock) { + this.busyLock = busyLock; + } + /** * @param ctx Context. * @param h2 H2 Indexing. @@ -102,11 +113,26 @@ public class GridMapQueryExecutor implements GridMessageListener { } }, EventType.EVT_NODE_FAILED, EventType.EVT_NODE_LEFT); - ctx.io().addMessageListener(GridTopic.TOPIC_QUERY, this); + ctx.io().addMessageListener(GridTopic.TOPIC_QUERY, new GridMessageListener() { + @Override public void onMessage(UUID nodeId, Object msg) { + if (!busyLock.enterBusy()) + return; + + try { + GridMapQueryExecutor.this.onMessage(nodeId, msg); + } + finally { + busyLock.leaveBusy(); + } + } + }); } - /** {@inheritDoc} */ - @Override public void onMessage(UUID nodeId, Object msg) { + /** + * @param nodeId Node ID. + * @param msg Message. + */ + public void onMessage(UUID nodeId, Object msg) { try { assert msg != null; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/235843e6/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java index b7edd27..7f42e0d 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java @@ -30,6 +30,7 @@ import org.apache.ignite.internal.processors.query.*; import org.apache.ignite.internal.processors.query.h2.*; import org.apache.ignite.internal.processors.query.h2.sql.*; import org.apache.ignite.internal.processors.query.h2.twostep.messages.*; +import org.apache.ignite.internal.util.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.plugin.extensions.communication.*; @@ -56,7 +57,7 @@ import java.util.concurrent.atomic.*; /** * Reduce query executor. */ -public class GridReduceQueryExecutor implements GridMessageListener { +public class GridReduceQueryExecutor { /** */ private GridKernalContext ctx; @@ -100,6 +101,16 @@ public class GridReduceQueryExecutor implements GridMessageListener { } } + /** */ + private final GridSpinBusyLock busyLock; + + /** + * @param busyLock Busy lock. + */ + public GridReduceQueryExecutor(GridSpinBusyLock busyLock) { + this.busyLock = busyLock; + } + /** * @param ctx Context. * @param h2 H2 Indexing. @@ -111,7 +122,19 @@ public class GridReduceQueryExecutor implements GridMessageListener { log = ctx.log(GridReduceQueryExecutor.class); - ctx.io().addMessageListener(GridTopic.TOPIC_QUERY, this); + ctx.io().addMessageListener(GridTopic.TOPIC_QUERY, new GridMessageListener() { + @Override public void onMessage(UUID nodeId, Object msg) { + if (!busyLock.enterBusy()) + return; + + try { + GridReduceQueryExecutor.this.onMessage(nodeId, msg); + } + finally { + busyLock.leaveBusy(); + } + } + }); ctx.event().addLocalEventListener(new GridLocalEventListener() { @Override public void onEvent(final Event evt) { @@ -133,8 +156,11 @@ public class GridReduceQueryExecutor implements GridMessageListener { " NOBUFFER FOR \"" + GridReduceQueryExecutor.class.getName() + ".mergeTableFunction\""); } - /** {@inheritDoc} */ - @Override public void onMessage(UUID nodeId, Object msg) { + /** + * @param nodeId Node ID. + * @param msg Message. + */ + public void onMessage(UUID nodeId, Object msg) { try { assert msg != null;
