ignite-484 - v1
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/7000722c Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/7000722c Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/7000722c Branch: refs/heads/ignite-484 Commit: 7000722cf550b4333eded7640d965583f2768bdf Parents: e975b7a Author: S.Vladykin <svlady...@gridgain.com> Authored: Tue May 12 08:19:40 2015 +0300 Committer: S.Vladykin <svlady...@gridgain.com> Committed: Tue May 12 08:19:40 2015 +0300 ---------------------------------------------------------------------- .../affinity/AffinityTopologyVersion.java | 7 - .../messages/GridQueryNextPageResponse.java | 39 ++++- .../query/h2/twostep/GridMapQueryExecutor.java | 115 +++++++++++--- .../h2/twostep/GridReduceQueryExecutor.java | 156 +++++++++++-------- 4 files changed, 223 insertions(+), 94 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7000722c/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityTopologyVersion.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityTopologyVersion.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityTopologyVersion.java index 77f3359..650c047 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityTopologyVersion.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityTopologyVersion.java @@ -76,13 +76,6 @@ public class AffinityTopologyVersion implements Comparable<AffinityTopologyVersi } /** - * @param topVer New topology version. - */ - public void topologyVersion(long topVer) { - this.topVer = topVer; - } - - /** * @return Minor topology version. */ public int minorTopologyVersion() { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7000722c/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageResponse.java index 4fdc027..c2cca75 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageResponse.java @@ -33,6 +33,12 @@ public class GridQueryNextPageResponse implements Message { private static final long serialVersionUID = 0L; /** */ + public static final byte CODE_OK = 0; + + /** */ + public static final byte CODE_RETRY = -1; + + /** */ private long qryReqId; /** */ @@ -55,6 +61,9 @@ public class GridQueryNextPageResponse implements Message { @GridDirectTransient private transient Collection<?> plainRows; + /** Response code. */ + private byte code = CODE_OK; + /** * For {@link Externalizable}. */ @@ -86,6 +95,20 @@ public class GridQueryNextPageResponse implements Message { } /** + * @return Response code. + */ + public byte code() { + return code; + } + + /** + * @param code Response code. + */ + public void code(byte code) { + this.code = code; + } + + /** * @return Query request ID. */ public long queryRequestId() { @@ -186,6 +209,12 @@ public class GridQueryNextPageResponse implements Message { return false; writer.incrementState(); + + case 6: + if (!writer.writeByte("code", code)) + return false; + + writer.incrementState(); } return true; @@ -247,6 +276,14 @@ public class GridQueryNextPageResponse implements Message { reader.incrementState(); + case 6: + code = reader.readByte("code"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + } return true; @@ -259,6 +296,6 @@ public class GridQueryNextPageResponse implements Message { /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 6; + return 7; } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7000722c/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java index f15a2da..2483912 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java @@ -23,7 +23,9 @@ import org.apache.ignite.events.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.managers.communication.*; import org.apache.ignite.internal.managers.eventstorage.*; +import org.apache.ignite.internal.processors.affinity.*; import org.apache.ignite.internal.processors.cache.*; +import org.apache.ignite.internal.processors.cache.distributed.dht.*; import org.apache.ignite.internal.processors.cache.query.*; import org.apache.ignite.internal.processors.query.h2.*; import org.apache.ignite.internal.processors.query.h2.twostep.messages.*; @@ -36,6 +38,7 @@ import org.h2.jdbc.*; import org.h2.result.*; import org.h2.store.*; import org.h2.value.*; +import org.jetbrains.annotations.*; import org.jsr166.*; import javax.cache.*; @@ -198,6 +201,16 @@ public class GridMapQueryExecutor { } /** + * @param cacheName Cache name. + * @return Cache context or {@code null} if none. + */ + @Nullable private GridCacheContext<?,?> cacheContext(String cacheName) { + GridCacheAdapter<?,?> cache = ctx.cache().internalCache(cacheName); + + return cache == null ? null : cache.context(); + } + + /** * Executing queries locally. * * @param node Node. @@ -206,32 +219,75 @@ public class GridMapQueryExecutor { private void onQueryRequest(ClusterNode node, GridQueryRequest req) { ConcurrentMap<Long,QueryResults> nodeRess = resultsForNode(node.id()); - Collection<GridCacheSqlQuery> qrys; + QueryResults qr = null; + + List<GridDhtLocalPartition> reserved = new ArrayList<>(); try { - qrys = req.queries(); + Collection<GridCacheSqlQuery> qrys; - if (!node.isLocal()) { - Marshaller m = ctx.config().getMarshaller(); + try { + qrys = req.queries(); + + if (!node.isLocal()) { + Marshaller m = ctx.config().getMarshaller(); - for (GridCacheSqlQuery qry : qrys) - qry.unmarshallParams(m); + for (GridCacheSqlQuery qry : qrys) + qry.unmarshallParams(m); + } + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); } - } - catch (IgniteCheckedException e) { - throw new IgniteException(e); - } - GridCacheContext<?,?> cctx = ctx.cache().internalCache(req.space()).context(); + List<GridCacheContext<?,?>> cctxs = new ArrayList<>(); - QueryResults qr = new QueryResults(req.requestId(), qrys.size(), cctx); + for (String cacheName : F.concat(true, req.space(), req.extraSpaces())) { + GridCacheContext<?,?> cctx = cacheContext(cacheName); - if (nodeRess.put(req.requestId(), qr) != null) - throw new IllegalStateException(); + if (cctx == null) { // Cache was not deployed yet. + sendRetry(node, req.requestId()); - h2.setFilters(h2.backupFilter()); + return; + } + else + cctxs.add(cctx); + } + + for (GridCacheContext<?,?> cctx : cctxs) { // Lock primary partitions. + // TODO how to get all partitions for topology version consistently? + List<GridDhtLocalPartition> parts = cctx.topology().localPartitions(); + AffinityTopologyVersion affTopVer = cctx.topology().topologyVersion(); + + if (affTopVer.topologyVersion() != req.topologyVersion()) { + sendRetry(node, req.requestId()); + + return; + } + + for (GridDhtLocalPartition part : parts) { + if (!part.primary(affTopVer)) + continue; + + if (!part.reserve()) { + sendRetry(node, req.requestId()); + + return; + } + + reserved.add(part); + } + } + + GridCacheContext<?,?> cctx = cctxs.get(0); // Main cache context. + + qr = new QueryResults(req.requestId(), qrys.size(), cctx); + + if (nodeRess.put(req.requestId(), qr) != null) + throw new IllegalStateException(); + + h2.setFilters(h2.backupFilter()); - try { // TODO Prepare snapshots for all the needed tables before the run. // Run queries. @@ -276,9 +332,11 @@ public class GridMapQueryExecutor { } } catch (Throwable e) { - nodeRess.remove(req.requestId(), qr); + if (qr != null) { + nodeRess.remove(req.requestId(), qr); - qr.cancel(); + qr.cancel(); + } U.error(log, "Failed to execute local query: " + req, e); @@ -289,6 +347,9 @@ public class GridMapQueryExecutor { } finally { h2.setFilters(null); + + for (GridDhtLocalPartition part : reserved) + part.release(); } } @@ -375,6 +436,24 @@ public class GridMapQueryExecutor { } /** + * @param node Node. + * @param reqId Request ID. + * @throws IgniteCheckedException If failed. + */ + private void sendRetry(ClusterNode node, long reqId) throws IgniteCheckedException { + boolean loc = node.isLocal(); + + GridQueryNextPageResponse msg = new GridQueryNextPageResponse(reqId, + /*qry*/0, /*page*/0, /*allRows*/0, /*cols*/1, + loc ? null : Collections.<Message>emptyList(), + loc ? Collections.<Value[]>emptyList() : null); + + msg.code(GridQueryNextPageResponse.CODE_RETRY); + + ctx.io().send(node, GridTopic.TOPIC_QUERY, msg, GridIoPolicy.PUBLIC_POOL); + } + + /** * @param bytes Bytes. * @return Rows. */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7000722c/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java index 2e69286..3391c97 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java @@ -260,6 +260,9 @@ public class GridReduceQueryExecutor { idx.addPage(page); + if (msg.code() == GridQueryNextPageResponse.CODE_RETRY) + r.retry = true; + if (msg.allRows() != -1) // Only the first page contains row count. r.latch.countDown(); } @@ -270,109 +273,123 @@ public class GridReduceQueryExecutor { * @return Cursor. */ public QueryCursor<List<?>> query(GridCacheContext<?,?> cctx, GridCacheTwoStepQuery qry) { - long qryReqId = reqIdGen.incrementAndGet(); + for (int attempt = 0;; attempt++) { + long qryReqId = reqIdGen.incrementAndGet(); - QueryRun r = new QueryRun(); + QueryRun r = new QueryRun(); - r.pageSize = qry.pageSize() <= 0 ? GridCacheTwoStepQuery.DFLT_PAGE_SIZE : qry.pageSize(); + r.pageSize = qry.pageSize() <= 0 ? GridCacheTwoStepQuery.DFLT_PAGE_SIZE : qry.pageSize(); - r.tbls = new ArrayList<>(qry.mapQueries().size()); + r.tbls = new ArrayList<>(qry.mapQueries().size()); - String space = cctx.name(); + String space = cctx.name(); - r.conn = (JdbcConnection)h2.connectionForSpace(space); + r.conn = (JdbcConnection)h2.connectionForSpace(space); - // TODO Add topology version. - ClusterGroup dataNodes = ctx.grid().cluster().forDataNodes(space); + final long topVer = ctx.cluster().get().topologyVersion(); - if (cctx.isReplicated() || qry.explain()) { - assert qry.explain() || dataNodes.node(ctx.localNodeId()) == null : "We must be on a client node."; + // TODO get projection for this topology version. + ClusterGroup dataNodes = ctx.grid().cluster().forDataNodes(space); - // Select random data node to run query on a replicated data or get EXPLAIN PLAN from a single node. - dataNodes = dataNodes.forRandom(); - } + if (cctx.isReplicated() || qry.explain()) { + assert qry.explain() || dataNodes.node(ctx.localNodeId()) == null : "We must be on a client node."; - final Collection<ClusterNode> nodes = dataNodes.nodes(); + // Select random data node to run query on a replicated data or get EXPLAIN PLAN from a single node. + dataNodes = dataNodes.forRandom(); + } - for (GridCacheSqlQuery mapQry : qry.mapQueries()) { - GridMergeTable tbl; + final Collection<ClusterNode> nodes = dataNodes.nodes(); - try { - tbl = createFunctionTable(r.conn, mapQry, qry.explain()); // createTable(r.conn, mapQry); TODO - } - catch (IgniteCheckedException e) { - throw new IgniteException(e); - } + for (GridCacheSqlQuery mapQry : qry.mapQueries()) { + GridMergeTable tbl; - GridMergeIndex idx = tbl.getScanIndex(null); + try { + tbl = createFunctionTable(r.conn, mapQry, qry.explain()); // createTable(r.conn, mapQry); TODO + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } - for (ClusterNode node : nodes) - idx.addSource(node.id()); + GridMergeIndex idx = tbl.getScanIndex(null); - r.tbls.add(tbl); + for (ClusterNode node : nodes) + idx.addSource(node.id()); - curFunTbl.set(tbl); - } + r.tbls.add(tbl); - r.latch = new CountDownLatch(r.tbls.size() * nodes.size()); + curFunTbl.set(tbl); + } - runs.put(qryReqId, r); + r.latch = new CountDownLatch(r.tbls.size() * nodes.size()); - try { - Collection<GridCacheSqlQuery> mapQrys = qry.mapQueries(); + runs.put(qryReqId, r); - if (qry.explain()) { - mapQrys = new ArrayList<>(qry.mapQueries().size()); + try { + Collection<GridCacheSqlQuery> mapQrys = qry.mapQueries(); - for (GridCacheSqlQuery mapQry : qry.mapQueries()) - mapQrys.add(new GridCacheSqlQuery(mapQry.alias(), "EXPLAIN " + mapQry.query(), mapQry.parameters())); - } + if (qry.explain()) { + mapQrys = new ArrayList<>(qry.mapQueries().size()); - if (nodes.size() != 1 || !F.first(nodes).isLocal()) { // Marshall params for remotes. - Marshaller m = ctx.config().getMarshaller(); + for (GridCacheSqlQuery mapQry : qry.mapQueries()) + mapQrys.add(new GridCacheSqlQuery(mapQry.alias(), "EXPLAIN " + mapQry.query(), mapQry.parameters())); + } - for (GridCacheSqlQuery mapQry : mapQrys) - mapQry.marshallParams(m); - } + if (nodes.size() != 1 || !F.first(nodes).isLocal()) { // Marshall params for remotes. + Marshaller m = ctx.config().getMarshaller(); - send(nodes, new GridQueryRequest(qryReqId, r.pageSize, space, mapQrys, - ctx.cluster().get().topologyVersion(), - extraSpaces(space, qry.spaces()))); + for (GridCacheSqlQuery mapQry : mapQrys) + mapQry.marshallParams(m); + } + + send(nodes, new GridQueryRequest(qryReqId, r.pageSize, space, mapQrys, topVer, + extraSpaces(space, qry.spaces()))); - r.latch.await(); + U.await(r.latch); - if (r.rmtErr != null) - throw new CacheException("Failed to run map query remotely.", r.rmtErr); + if (r.rmtErr != null) + throw new CacheException("Failed to run map query remotely.", r.rmtErr); - if (qry.explain()) - return explainPlan(r.conn, space, qry); + ResultSet res = null; - GridCacheSqlQuery rdc = qry.reduceQuery(); + if (!r.retry) { + if (qry.explain()) + return explainPlan(r.conn, space, qry); - final ResultSet res = h2.executeSqlQueryWithTimer(space, r.conn, rdc.query(), F.asList(rdc.parameters())); + GridCacheSqlQuery rdc = qry.reduceQuery(); + + res = h2.executeSqlQueryWithTimer(space, r.conn, rdc.query(), F.asList(rdc.parameters())); + } - for (GridMergeTable tbl : r.tbls) { - if (!tbl.getScanIndex(null).fetchedAll()) // We have to explicitly cancel queries on remote nodes. - send(nodes, new GridQueryCancelRequest(qryReqId)); + for (GridMergeTable tbl : r.tbls) { + if (!tbl.getScanIndex(null).fetchedAll()) // We have to explicitly cancel queries on remote nodes. + send(nodes, new GridQueryCancelRequest(qryReqId)); // dropTable(r.conn, tbl.getName()); TODO - } + } - return new QueryCursorImpl<>(new GridQueryCacheObjectsIterator(new Iter(res), cctx, cctx.keepPortable())); - } - catch (IgniteCheckedException | InterruptedException | RuntimeException e) { - U.closeQuiet(r.conn); + if (r.retry) { + if (attempt > 0) + U.sleep(attempt * 10); + + continue; + } + + return new QueryCursorImpl<>(new GridQueryCacheObjectsIterator(new Iter(res), cctx, cctx.keepPortable())); + } + catch (IgniteCheckedException | RuntimeException e) { + U.closeQuiet(r.conn); - if (e instanceof CacheException) - throw (CacheException)e; + if (e instanceof CacheException) + throw (CacheException)e; - throw new CacheException("Failed to run reduce query locally.", e); - } - finally { - if (!runs.remove(qryReqId, r)) - U.warn(log, "Query run was already removed: " + qryReqId); + throw new CacheException("Failed to run reduce query locally.", e); + } + finally { + if (!runs.remove(qryReqId, r)) + U.warn(log, "Query run was already removed: " + qryReqId); - curFunTbl.remove(); + curFunTbl.remove(); + } } } @@ -680,6 +697,9 @@ public class GridReduceQueryExecutor { /** */ private volatile CacheException rmtErr; + + /** */ + private volatile boolean retry; } /**