Repository: incubator-ignite Updated Branches: refs/heads/ignite-59 6badd2bac -> 210845df4
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2851b52b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/sql/GridSqlQuerySplitter.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/sql/GridSqlQuerySplitter.java b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/sql/GridSqlQuerySplitter.java deleted file mode 100644 index b6e6f33..0000000 --- a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/sql/GridSqlQuerySplitter.java +++ /dev/null @@ -1,252 +0,0 @@ -/* @java.file.header */ - -/* _________ _____ __________________ _____ - * __ ____/___________(_)______ /__ ____/______ ____(_)_______ - * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ - * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / - * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ - */ - -package org.gridgain.grid.kernal.processors.query.h2.sql; - -import org.apache.ignite.*; -import org.gridgain.grid.kernal.processors.cache.query.*; - -import java.sql.*; -import java.util.*; - -import static org.gridgain.grid.kernal.processors.query.h2.sql.GridSqlFunctionType.*; - -/** - * Splits a single SQL query into two step map-reduce query. - */ -public class GridSqlQuerySplitter { - /** */ - private static final String TABLE_PREFIX = "__T"; - - /** */ - private static final String COLUMN_PREFIX = "__C"; - - /** - * @param idx Index of table. - * @return Table name. - */ - private static String table(int idx) { - return TABLE_PREFIX + idx; - } - - /** - * @param idx Index of column. - * @return Generated by index column alias. - */ - private static String columnName(int idx) { - return COLUMN_PREFIX + idx; - } - - /** - * @param conn Connection. - * @param query Query. - * @param params Parameters. - * @return Two step query. - */ - public static GridCacheTwoStepQuery split(Connection conn, String query, Object[] params) { - // TODO possibly get column types from query. - GridSqlSelect srcQry = GridSqlQueryParser.parse(conn, query); - - if (srcQry.groups().isEmpty()) { // Simple case. - String tbl0 = table(0); - - GridCacheTwoStepQuery res = new GridCacheTwoStepQuery("select * from " + tbl0); - - res.addMapQuery(tbl0, srcQry.getSQL(), params); - - return res; - } - - // Split all select expressions into map-reduce parts. - List<GridSqlElement> mapExps = new ArrayList<>(srcQry.allExpressions()); - - GridSqlElement[] rdcExps = new GridSqlElement[srcQry.select().size()]; - - for (int i = 0, len = mapExps.size(); i < len; i++) - splitSelectExpression(mapExps, rdcExps, i); - - // Build map query. - GridSqlSelect mapQry = srcQry.clone(); - - mapQry.clearSelect(); - - for (GridSqlElement exp : mapExps) - mapQry.addSelectExpression(exp); - - mapQry.clearGroups(); - - for (int col : srcQry.groupColumns()) - mapQry.addGroupExpression(column(((GridSqlAlias)mapExps.get(col)).alias())); - - // TODO sort support - - // Reduce query. - GridSqlSelect rdcQry = new GridSqlSelect(); - - for (GridSqlElement rdcExp : rdcExps) - rdcQry.addSelectExpression(rdcExp); - - rdcQry.from(new GridSqlTable(null, table(0))); - - for (int col : srcQry.groupColumns()) - rdcQry.addGroupExpression(column(((GridSqlAlias)mapExps.get(col)).alias())); - - GridCacheTwoStepQuery res = new GridCacheTwoStepQuery(rdcQry.getSQL()); - - res.addMapQuery(table(0), mapQry.getSQL(), params); - - return res; - } - - /** - * @param mapSelect Selects for map query. - * @param rdcSelect Selects for reduce query. - * @param idx Index. - */ - private static void splitSelectExpression(List<GridSqlElement> mapSelect, GridSqlElement[] rdcSelect, int idx) { - GridSqlElement el = mapSelect.get(idx); - - GridSqlAlias alias = null; - - if (el instanceof GridSqlAlias) { // Unwrap from alias. - alias = (GridSqlAlias)el; - el = alias.child(); - } - - if (el instanceof GridSqlAggregateFunction) { - GridSqlAggregateFunction agg = (GridSqlAggregateFunction)el; - - GridSqlElement mapAgg, rdcAgg; - - String mapAggAlias = columnName(idx); - - switch (agg.type()) { - case AVG: // SUM( AVG(CAST(x AS DECIMAL))*COUNT(x) )/SUM( COUNT(x) ). - //-- COUNT(x) map - GridSqlElement cntMapAgg = aggregate(agg.distinct(), COUNT).addChild(agg.child()); - - // Add generated alias to COUNT(x). - // Using size as index since COUNT will be added as the last select element to the map query. - String cntMapAggAlias = columnName(mapSelect.size()); - - cntMapAgg = alias(cntMapAggAlias, cntMapAgg); - - mapSelect.add(cntMapAgg); - - //-- AVG(CAST(x AS DECIMAL)) map - mapAgg = aggregate(agg.distinct(), AVG).addChild( // Add function argument. - function(CAST).setCastType("DECIMAL").addChild(agg.child())); - - //-- SUM( AVG(x)*COUNT(x) )/SUM( COUNT(x) ) reduce - GridSqlElement sumUpRdc = aggregate(false, SUM).addChild( - op(GridSqlOperationType.MULTIPLY, - column(mapAggAlias), - column(cntMapAggAlias))); - - GridSqlElement sumDownRdc = aggregate(false, SUM).addChild(column(cntMapAggAlias)); - - rdcAgg = op(GridSqlOperationType.DIVIDE, sumUpRdc, sumDownRdc); - - break; - - case SUM: // SUM( SUM(x) ) - case MAX: // MAX( MAX(x) ) - case MIN: // MIN( MIN(x) ) - mapAgg = aggregate(agg.distinct(), agg.type()).addChild(agg.child()); - - rdcAgg = aggregate(agg.distinct(), agg.type()).addChild(column(mapAggAlias)); - - break; - - case COUNT_ALL: // CAST(SUM( COUNT(*) ) AS BIGINT) - case COUNT: // CAST(SUM( COUNT(x) ) AS BIGINT) - mapAgg = aggregate(agg.distinct(), agg.type()); - - if (agg.type() == COUNT) - mapAgg.addChild(agg.child()); - - rdcAgg = aggregate(false, SUM).addChild(column(mapAggAlias)); - - rdcAgg = function(CAST).setCastType("BIGINT").addChild(rdcAgg); - - break; - - default: - throw new IgniteException("Unsupported aggregate: " + agg.type()); - } - - assert !(mapAgg instanceof GridSqlAlias); - - // Add generated alias to map aggregate. - mapAgg = alias(mapAggAlias, mapAgg); - - if (alias != null) // Add initial alias if it was set. - rdcAgg = alias(alias.alias(), rdcAgg); - - // Set map and reduce aggregates to their places in selects. - mapSelect.set(idx, mapAgg); - - rdcSelect[idx] = rdcAgg; - } - else { - if (alias == null) { // Generate alias if none. - alias = alias(columnName(idx), mapSelect.get(idx)); - - mapSelect.set(idx, alias); - } - - if (idx < rdcSelect.length) - rdcSelect[idx] = column(alias.alias()); - } - } - - /** - * @param distinct Distinct. - * @param type Type. - * @return Aggregate function. - */ - private static GridSqlAggregateFunction aggregate(boolean distinct, GridSqlFunctionType type) { - return new GridSqlAggregateFunction(distinct, type); - } - - /** - * @param name Column name. - * @return Column. - */ - private static GridSqlColumn column(String name) { - return new GridSqlColumn(null, name, name); - } - - /** - * @param alias Alias. - * @param child Child. - * @return Alias. - */ - private static GridSqlAlias alias(String alias, GridSqlElement child) { - return new GridSqlAlias(alias, child); - } - - /** - * @param type Type. - * @param left Left expression. - * @param right Right expression. - * @return Binary operator. - */ - private static GridSqlOperation op(GridSqlOperationType type, GridSqlElement left, GridSqlElement right) { - return new GridSqlOperation(type, left, right); - } - - /** - * @param type Type. - * @return Function. - */ - private static GridSqlFunction function(GridSqlFunctionType type) { - return new GridSqlFunction(type); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2851b52b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/sql/GridSqlSelect.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/sql/GridSqlSelect.java b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/sql/GridSqlSelect.java deleted file mode 100644 index 500c90c..0000000 --- a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/sql/GridSqlSelect.java +++ /dev/null @@ -1,287 +0,0 @@ -/* @java.file.header */ - -/* _________ _____ __________________ _____ - * __ ____/___________(_)______ /__ ____/______ ____(_)_______ - * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ - * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / - * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ - */ - -package org.gridgain.grid.kernal.processors.query.h2.sql; - -import org.h2.result.*; -import org.h2.util.*; - -import java.util.*; - -/** - * Select query. - */ -public class GridSqlSelect implements Cloneable { - /** */ - private boolean distinct; - - /** */ - private List<GridSqlElement> allExprs; - - /** */ - private List<GridSqlElement> select = new ArrayList<>(); - - /** */ - private List<GridSqlElement> groups = new ArrayList<>(); - - /** */ - private int[] grpCols; - - /** */ - private GridSqlElement from; - - /** */ - private GridSqlElement where; - - /** */ - private GridSqlElement having; - - /** */ - private Map<GridSqlElement, Integer> sort = new LinkedHashMap<>(); - - /** - * @return Distinct. - */ - public boolean distinct() { - return distinct; - } - - /** - * @param distinct New distinct. - */ - public void distinct(boolean distinct) { - this.distinct = distinct; - } - - /** - * @return Generate sql. - */ - public String getSQL() { - StatementBuilder buff = new StatementBuilder("SELECT"); - - if (distinct) - buff.append(" DISTINCT"); - - for (GridSqlElement expression : select) { - buff.appendExceptFirst(","); - buff.append('\n'); - buff.append(StringUtils.indent(expression.getSQL(), 4, false)); - } - - buff.append("\nFROM ").append(from.getSQL()); - - if (where != null) - buff.append("\nWHERE ").append(StringUtils.unEnclose(where.getSQL())); - - if (!groups.isEmpty()) { - buff.append("\nGROUP BY "); - - buff.resetCount(); - - for (GridSqlElement expression : groups) { - buff.appendExceptFirst(", "); - - if (expression instanceof GridSqlAlias) - buff.append(StringUtils.unEnclose((expression.child().getSQL()))); - else - buff.append(StringUtils.unEnclose(expression.getSQL())); - } - } - - if (having != null) - buff.append("\nHAVING ").append(StringUtils.unEnclose(having.getSQL())); - - if (!sort.isEmpty()) { - buff.append("\nORDER BY "); - - buff.resetCount(); - - for (Map.Entry<GridSqlElement, Integer> entry : sort.entrySet()) { - buff.appendExceptFirst(", "); - - GridSqlElement expression = entry.getKey(); - - int idx = select.indexOf(expression); - - if (idx >= 0) - buff.append(idx + 1); - else - buff.append('=').append(StringUtils.unEnclose(expression.getSQL())); - - int type = entry.getValue(); - - if ((type & SortOrder.DESCENDING) != 0) - buff.append(" DESC"); - - if ((type & SortOrder.NULLS_FIRST) != 0) - buff.append(" NULLS FIRST"); - else if ((type & SortOrder.NULLS_LAST) != 0) - buff.append(" NULLS LAST"); - } - } - - return buff.toString(); - } - - /** - * @param expression Expression. - */ - public void addExpression(GridSqlElement expression) { - if (allExprs == null) - allExprs = new ArrayList<>(); - - allExprs.add(expression); - } - - /** - * @return All expressions in select, group by, order by. - */ - public List<GridSqlElement> allExpressions() { - return allExprs; - } - - /** - * @return Expressions. - */ - public List<GridSqlElement> select() { - return select; - } - - /** - * Clears select list. - */ - public void clearSelect() { - select = new ArrayList<>(); - } - - /** - * @param expression Expression. - */ - public void addSelectExpression(GridSqlElement expression) { - select.add(expression); - } - - /** - * @return Expressions. - */ - public List<GridSqlElement> groups() { - return groups; - } - - /** - * - */ - public void clearGroups() { - groups = new ArrayList<>(); - grpCols = null; - } - - /** - * @param expression Expression. - */ - public void addGroupExpression(GridSqlElement expression) { - groups.add(expression); - } - - /** - * @return Group columns. - */ - public int[] groupColumns() { - return grpCols; - } - - /** - * @param grpCols Group columns. - */ - public void groupColumns(int[] grpCols) { - this.grpCols = grpCols; - } - - /** - * @return Tables. - */ - public GridSqlElement from() { - return from; - } - - /** - * @param from From element. - */ - public void from(GridSqlElement from) { - this.from = from; - } - - /** - * @return Where. - */ - public GridSqlElement where() { - return where; - } - - /** - * @param where New where. - */ - public void where(GridSqlElement where) { - this.where = where; - } - - /** - * @return Having. - */ - public GridSqlElement having() { - return having; - } - - /** - * @param having New having. - */ - public void having(GridSqlElement having) { - this.having = having; - } - - /** - * @return Sort. - */ - public Map<GridSqlElement, Integer> sort() { - return sort; - } - - /** - * - */ - public void clearSort() { - sort = new LinkedHashMap<>(); - } - - /** - * @param expression Expression. - * @param sortType The sort type bit mask (SortOrder.DESCENDING, SortOrder.NULLS_FIRST, SortOrder.NULLS_LAST). - */ - public void addSort(GridSqlElement expression, int sortType) { - sort.put(expression, sortType); - } - - /** {@inheritDoc} */ - @SuppressWarnings({"CloneCallsConstructors", "CloneDoesntDeclareCloneNotSupportedException"}) - @Override public GridSqlSelect clone() { - try { - GridSqlSelect res = (GridSqlSelect)super.clone(); - - res.select = new ArrayList<>(select); - res.groups = new ArrayList<>(groups); - res.sort = new LinkedHashMap<>(sort); - res.allExprs = null; - - return res; - } - catch (CloneNotSupportedException e) { - throw new RuntimeException(e); // Never thrown. - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2851b52b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/sql/GridSqlSubquery.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/sql/GridSqlSubquery.java b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/sql/GridSqlSubquery.java deleted file mode 100644 index 834f1f3..0000000 --- a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/sql/GridSqlSubquery.java +++ /dev/null @@ -1,44 +0,0 @@ -/* @java.file.header */ - -/* _________ _____ __________________ _____ - * __ ____/___________(_)______ /__ ____/______ ____(_)_______ - * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ - * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / - * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ - */ - -package org.gridgain.grid.kernal.processors.query.h2.sql; - -/** - * Subquery. - */ -public class GridSqlSubquery extends GridSqlElement { - /** */ - private GridSqlSelect select; - - /** - * @param select Select. - */ - public GridSqlSubquery(GridSqlSelect select) { - this.select = select; - } - - /** {@inheritDoc} */ - @Override public String getSQL() { - return "(" + select.getSQL() + ")"; - } - - /** - * @return Select. - */ - public GridSqlSelect select() { - return select; - } - - /** - * @param select New select. - */ - public void select(GridSqlSelect select) { - this.select = select; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2851b52b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/sql/GridSqlTable.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/sql/GridSqlTable.java b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/sql/GridSqlTable.java deleted file mode 100644 index 497fbc9..0000000 --- a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/sql/GridSqlTable.java +++ /dev/null @@ -1,55 +0,0 @@ -/* @java.file.header */ - -/* _________ _____ __________________ _____ - * __ ____/___________(_)______ /__ ____/______ ____(_)_______ - * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ - * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / - * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ - */ - -package org.gridgain.grid.kernal.processors.query.h2.sql; - -import org.h2.command.*; -import org.jetbrains.annotations.*; - -/** - * Table with optional schema. - */ -public class GridSqlTable extends GridSqlElement { - /** */ - private final String schema; - - /** */ - private final String tblName; - - /** - * @param schema Schema. - * @param tblName Table name. - */ - public GridSqlTable(@Nullable String schema, String tblName) { - this.schema = schema; - this.tblName = tblName; - } - - /** {@inheritDoc} */ - @Override public String getSQL() { - if (schema == null) - return Parser.quoteIdentifier(tblName); - - return Parser.quoteIdentifier(schema) + '.' + Parser.quoteIdentifier(tblName); - } - - /** - * @return Schema. - */ - public String schema() { - return schema; - } - - /** - * @return Table name. - */ - public String tableName() { - return tblName; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2851b52b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/sql/GridSqlValue.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/sql/GridSqlValue.java b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/sql/GridSqlValue.java deleted file mode 100644 index 62765e7..0000000 --- a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/sql/GridSqlValue.java +++ /dev/null @@ -1,17 +0,0 @@ -/* @java.file.header */ - -/* _________ _____ __________________ _____ - * __ ____/___________(_)______ /__ ____/______ ____(_)_______ - * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ - * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / - * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ - */ - -package org.gridgain.grid.kernal.processors.query.h2.sql; - -/** - * Marker interface for a simple value. - */ -public interface GridSqlValue { - // No-op. -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2851b52b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMapQueryExecutor.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMapQueryExecutor.java deleted file mode 100644 index 1f27150..0000000 --- a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMapQueryExecutor.java +++ /dev/null @@ -1,270 +0,0 @@ -/* @java.file.header */ - -/* _________ _____ __________________ _____ - * __ ____/___________(_)______ /__ ____/______ ____(_)_______ - * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ - * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / - * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ - */ - -package org.gridgain.grid.kernal.processors.query.h2.twostep; - -import org.apache.ignite.*; -import org.apache.ignite.cluster.*; -import org.apache.ignite.internal.*; -import org.apache.ignite.internal.processors.cache.*; -import org.apache.ignite.internal.processors.query.h2.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.spi.indexing.*; -import org.gridgain.grid.kernal.processors.cache.query.*; -import org.gridgain.grid.kernal.processors.query.h2.twostep.messages.*; -import org.h2.jdbc.*; -import org.h2.result.*; -import org.h2.value.*; -import org.jdk8.backport.*; -import org.jetbrains.annotations.*; - -import java.lang.reflect.*; -import java.sql.*; -import java.util.*; -import java.util.concurrent.*; - -/** - * Map query executor. - */ -public class GridMapQueryExecutor { - /** */ - private static final Field RESULT_FIELD; - - /** - * Initialize. - */ - static { - try { - RESULT_FIELD = JdbcResultSet.class.getDeclaredField("result"); - - RESULT_FIELD.setAccessible(true); - } - catch (NoSuchFieldException e) { - throw new IllegalStateException("Check H2 version in classpath.", e); - } - } - - /** */ - private IgniteLogger log; - - /** */ - private GridKernalContext ctx; - - /** */ - private IgniteH2Indexing h2; - - /** */ - private ConcurrentMap<UUID, ConcurrentMap<Long, QueryResults>> qryRess = new ConcurrentHashMap8<>(); - - /** - * @param ctx Context. - * @param h2 H2 Indexing. - * @throws IgniteCheckedException If failed. - */ - public void start(final GridKernalContext ctx, IgniteH2Indexing h2) throws IgniteCheckedException { - this.ctx = ctx; - this.h2 = h2; - - log = ctx.log(GridMapQueryExecutor.class); - - // TODO handle node failures. - - ctx.io().addUserMessageListener(GridTopic.TOPIC_QUERY, new IgniteBiPredicate<UUID, Object>() { - @Override public boolean apply(UUID nodeId, Object msg) { - assert msg != null; - - ClusterNode node = ctx.discovery().node(nodeId); - - if (msg instanceof GridQueryRequest) - executeLocalQuery(node, (GridQueryRequest)msg); - else if (msg instanceof GridNextPageRequest) - sendNextPage(node, (GridNextPageRequest)msg); - - return true; - } - }); - } - - /** - * @param node Node. - * @param req Query request. - */ - private void executeLocalQuery(ClusterNode node, GridQueryRequest req) { - h2.setFilters(new GridIndexingQueryFilter() { - @Nullable @Override public <K, V> IgniteBiPredicate<K, V> forSpace(String spaceName) { - final GridCacheAdapter<Object, Object> cache = ctx.cache().internalCache(spaceName); - - if (cache.context().isReplicated() || cache.configuration().getBackups() == 0) - return null; - - return new IgniteBiPredicate<K, V>() { - @Override public boolean apply(K k, V v) { - return cache.context().affinity().primary(ctx.discovery().localNode(), k, -1); - } - }; - } - }); - - try { - QueryResults qr = new QueryResults(req.requestId(), req.queries().size()); - - ConcurrentMap<Long, QueryResults> nodeRess = qryRess.get(node.id()); - - if (nodeRess == null) { - nodeRess = new ConcurrentHashMap8<>(); - - ConcurrentMap<Long, QueryResults> old = qryRess.putIfAbsent(node.id(), nodeRess); - - if (old != null) - nodeRess = old; - } - - QueryResults old = nodeRess.putIfAbsent(req.requestId(), qr); - - assert old == null; - - // Prepare snapshots for all the needed tables before actual run. - for (GridCacheSqlQuery qry : req.queries()) { - // TODO - } - - // Run queries. - int i = 0; - - for (GridCacheSqlQuery qry : req.queries()) { - ResultSet rs = h2.executeSqlQueryWithTimer(h2.connectionForSpace(null), qry.query(), - F.asList(qry.parameters())); - - assert rs instanceof JdbcResultSet : rs.getClass(); - - ResultInterface res = (ResultInterface)RESULT_FIELD.get(rs); - - qr.results[i] = res; - qr.resultSets[i] = rs; - - // Send the first page. - sendNextPage(node, qr, i, req.pageSize(), res.getRowCount()); - - i++; - } - } - catch (Throwable e) { - sendError(node, req.requestId(), e); - } - finally { - h2.setFilters(null); - } - } - - /** - * @param node Node. - * @param qryReqId Query request ID. - * @param err Error. - */ - private void sendError(ClusterNode node, long qryReqId, Throwable err) { - try { - ctx.io().sendUserMessage(F.asList(node), new GridQueryFailResponse(qryReqId, err)); - } - catch (IgniteCheckedException e) { - e.addSuppressed(err); - - log.error("Failed to send error message.", e); - } - } - - /** - * @param node Node. - * @param req Request. - */ - private void sendNextPage(ClusterNode node, GridNextPageRequest req) { - ConcurrentMap<Long, QueryResults> nodeRess = qryRess.get(node.id()); - - QueryResults qr = nodeRess == null ? null : nodeRess.get(req.queryRequestId()); - - if (qr == null) - sendError(node, req.queryRequestId(), - new IllegalStateException("No query result found for request: " + req)); - else - sendNextPage(node, qr, req.query(), req.pageSize(), -1); - } - - /** - * @param node Node. - * @param qr Query results. - * @param qry Query. - * @param pageSize Page size. - * @param allRows All rows count. - */ - private void sendNextPage(ClusterNode node, QueryResults qr, int qry, int pageSize, int allRows) { - int page; - - List<Value[]> rows = new ArrayList<>(Math.min(64, pageSize)); - - ResultInterface res = qr.results[qry]; - - assert res != null; - - boolean last = false; - - synchronized (res) { - page = qr.pages[qry]++; - - for (int i = 0 ; i < pageSize; i++) { - if (!res.next()) { - last = true; - - break; - } - - rows.add(res.currentRow()); - } - } - - try { - ctx.io().sendUserMessage(F.asList(node), - new GridNextPageResponse(qr.qryReqId, qry, page, allRows, last, rows), - GridTopic.TOPIC_QUERY, false, 0); - } - catch (IgniteCheckedException e) { - log.error("Failed to send message.", e); - - throw new IgniteException(e); - } - } - - /** - * - */ - private static class QueryResults { - /** */ - private long qryReqId; - - /** */ - private ResultInterface[] results; - - /** */ - private ResultSet[] resultSets; - - /** */ - private int[] pages; - - /** - * @param qryReqId Query request ID. - * @param qrys Queries. - */ - private QueryResults(long qryReqId, int qrys) { - this.qryReqId = qryReqId; - - results = new ResultInterface[qrys]; - resultSets = new ResultSet[qrys]; - pages = new int[qrys]; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2851b52b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMergeIndex.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMergeIndex.java b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMergeIndex.java deleted file mode 100644 index 707d54e..0000000 --- a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMergeIndex.java +++ /dev/null @@ -1,290 +0,0 @@ -/* @java.file.header */ - -/* _________ _____ __________________ _____ - * __ ____/___________(_)______ /__ ____/______ ____(_)_______ - * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ - * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / - * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ - */ - -package org.gridgain.grid.kernal.processors.query.h2.twostep; - -import org.apache.ignite.*; -import org.h2.engine.*; -import org.h2.index.*; -import org.h2.message.*; -import org.h2.result.*; -import org.h2.table.*; -import org.jetbrains.annotations.*; - -import java.util.*; -import java.util.concurrent.atomic.*; - -/** - * Merge index. - */ -public abstract class GridMergeIndex extends BaseIndex { - /** */ - protected final GridResultPage<?> END = new GridResultPage<Object>(null, null); - - /** */ - private static final int MAX_FETCH_SIZE = 100000; - - /** */ - private final AtomicInteger cnt = new AtomicInteger(0); - - /** Result sources. */ - private final AtomicInteger srcs = new AtomicInteger(0); - - /** - * Will be r/w from query execution thread only, does not need to be threadsafe. - */ - private ArrayList<Row> fetched = new ArrayList<>(); - - /** - * @param tbl Table. - * @param name Index name. - * @param type Type. - * @param cols Columns. - */ - public GridMergeIndex(GridMergeTable tbl, String name, IndexType type, IndexColumn[] cols) { - initBaseIndex(tbl, 0, name, cols, type); - } - - /** {@inheritDoc} */ - @Override public long getRowCount(Session session) { - return cnt.get(); - } - - /** {@inheritDoc} */ - @Override public long getRowCountApproximation() { - return getRowCount(null); - } - - /** - * @param srcs Number of sources. - */ - public void setNumberOfSources(int srcs) { - this.srcs.set(srcs); - } - - /** - * @param cnt Count. - */ - public void addCount(int cnt) { - this.cnt.addAndGet(cnt); - } - - /** - * @param page Page. - */ - public final void addPage(GridResultPage<?> page) { - if (!page.response().rows().isEmpty()) - addPage0(page); - else - assert page.response().isLast(); - - if (page.response().isLast()) { - int srcs0 = srcs.decrementAndGet(); - - assert srcs0 >= 0; - - if (srcs0 == 0) - addPage0(END); // We've fetched all. - } - } - - /** - * @param page Page. - */ - protected abstract void addPage0(GridResultPage<?> page); - - /** {@inheritDoc} */ - @Override public Cursor find(Session session, SearchRow first, SearchRow last) { - if (fetched == null) - throw new IgniteException("Fetched result set was too large."); - - if (fetched.size() == cnt.get()) // We've fetched all the rows. - return findAllFetched(fetched, first, last); - - return findInStream(first, last); - } - - /** - * @param first First row. - * @param last Last row. - * @return Cursor. Usually it must be {@link FetchingCursor} instance. - */ - protected abstract Cursor findInStream(@Nullable SearchRow first, @Nullable SearchRow last); - - /** - * @param fetched Fetched rows. - * @param first First row. - * @param last Last row. - * @return Cursor. - */ - protected Cursor findAllFetched(List<Row> fetched, @Nullable SearchRow first, @Nullable SearchRow last) { - return new IteratorCursor(fetched.iterator()); - } - - /** {@inheritDoc} */ - @Override public void checkRename() { - throw DbException.getUnsupportedException("rename"); - } - - /** {@inheritDoc} */ - @Override public void close(Session session) { - // No-op. - } - - /** {@inheritDoc} */ - @Override public void add(Session session, Row row) { - throw DbException.getUnsupportedException("add"); - } - - /** {@inheritDoc} */ - @Override public void remove(Session session, Row row) { - throw DbException.getUnsupportedException("remove row"); - } - - /** {@inheritDoc} */ - @Override public double getCost(Session session, int[] masks, TableFilter filter, SortOrder sortOrder) { - return getRowCountApproximation() + Constants.COST_ROW_OFFSET; - } - - /** {@inheritDoc} */ - @Override public void remove(Session session) { - throw DbException.getUnsupportedException("remove index"); - } - - /** {@inheritDoc} */ - @Override public void truncate(Session session) { - throw DbException.getUnsupportedException("truncate"); - } - - /** {@inheritDoc} */ - @Override public boolean canGetFirstOrLast() { - return false; - } - - /** {@inheritDoc} */ - @Override public Cursor findFirstOrLast(Session session, boolean first) { - throw DbException.getUnsupportedException("findFirstOrLast"); - } - - /** {@inheritDoc} */ - @Override public boolean needRebuild() { - return false; - } - - /** {@inheritDoc} */ - @Override public long getDiskSpaceUsed() { - return 0; - } - - /** - * Cursor over iterator. - */ - protected class IteratorCursor implements Cursor { - /** */ - protected Iterator<Row> iter; - - /** */ - protected Row cur; - - /** - * @param iter Iterator. - */ - public IteratorCursor(Iterator<Row> iter) { - assert iter != null; - - this.iter = iter; - } - - /** {@inheritDoc} */ - @Override public Row get() { - return cur; - } - - /** {@inheritDoc} */ - @Override public SearchRow getSearchRow() { - return get(); - } - - /** {@inheritDoc} */ - @Override public boolean next() { - cur = iter.hasNext() ? iter.next() : null; - - return cur != null; - } - - /** {@inheritDoc} */ - @Override public boolean previous() { - throw DbException.getUnsupportedException("previous"); - } - } - - /** - * Fetching cursor. - */ - protected class FetchingCursor extends IteratorCursor { - /** */ - private Iterator<Row> stream; - - /** - */ - public FetchingCursor(Iterator<Row> stream) { - super(new FetchedIterator()); - - assert stream != null; - - this.stream = stream; - } - - /** {@inheritDoc} */ - @Override public boolean next() { - if (super.next()) { - assert cur != null; - - if (iter == stream && fetched != null) { // Cache fetched rows for reuse. - if (fetched.size() == MAX_FETCH_SIZE) - fetched = null; // Throw away fetched result if it is too large. - else - fetched.add(cur); - } - - return true; - } - - if (iter == stream) // We've fetched the stream. - return false; - - iter = stream; // Switch from cached to stream. - - return next(); - } - } - - /** - * List iterator without {@link ConcurrentModificationException}. - */ - private class FetchedIterator implements Iterator<Row> { - /** */ - private int idx; - - /** {@inheritDoc} */ - @Override public boolean hasNext() { - return fetched != null && idx < fetched.size(); - } - - /** {@inheritDoc} */ - @Override public Row next() { - return fetched.get(idx++); - } - - /** {@inheritDoc} */ - @Override public void remove() { - throw new UnsupportedOperationException(); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2851b52b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMergeIndexUnsorted.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMergeIndexUnsorted.java b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMergeIndexUnsorted.java deleted file mode 100644 index 42f9d33..0000000 --- a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMergeIndexUnsorted.java +++ /dev/null @@ -1,85 +0,0 @@ -/* @java.file.header */ - -/* _________ _____ __________________ _____ - * __ ____/___________(_)______ /__ ____/______ ____(_)_______ - * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ - * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / - * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ - */ - -package org.gridgain.grid.kernal.processors.query.h2.twostep; - -import org.apache.ignite.*; -import org.h2.index.*; -import org.h2.result.*; -import org.h2.table.*; -import org.h2.value.*; -import org.jetbrains.annotations.*; - -import java.util.*; -import java.util.concurrent.*; - -/** - * Unsorted merge index. - */ -public class GridMergeIndexUnsorted extends GridMergeIndex { - /** */ - private final BlockingQueue<GridResultPage<?>> queue = new LinkedBlockingQueue<>(); - - /** - * @param tbl Table. - * @param name Index name. - */ - public GridMergeIndexUnsorted(GridMergeTable tbl, String name) { - super(tbl, name, IndexType.createScan(false), IndexColumn.wrap(tbl.getColumns())); - } - - /** {@inheritDoc} */ - @Override public void addPage0(GridResultPage<?> page) { - queue.add(page); - } - - /** {@inheritDoc} */ - @Override protected Cursor findInStream(@Nullable SearchRow first, @Nullable SearchRow last) { - return new FetchingCursor(new Iterator<Row>() { - /** */ - Iterator<Value[]> iter = Collections.emptyIterator(); - - @Override public boolean hasNext() { - if (iter.hasNext()) - return true; - - GridResultPage<?> page; - - try { - page = queue.take(); - } - catch (InterruptedException e) { - throw new IgniteException("Query execution was interrupted.", e); - } - - if (page == END) { - assert queue.isEmpty() : "It must be the last page: " + queue; - - return false; // We are done. - } - - page.fetchNextPage(); - - iter = page.response().rows().iterator(); - - assert iter.hasNext(); - - return true; - } - - @Override public Row next() { - return new Row(iter.next(), 0); - } - - @Override public void remove() { - throw new UnsupportedOperationException(); - } - }); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2851b52b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMergeTable.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMergeTable.java b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMergeTable.java deleted file mode 100644 index 683bb54..0000000 --- a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMergeTable.java +++ /dev/null @@ -1,178 +0,0 @@ -/* @java.file.header */ - -/* _________ _____ __________________ _____ - * __ ____/___________(_)______ /__ ____/______ ____(_)_______ - * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ - * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / - * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ - */ - -package org.gridgain.grid.kernal.processors.query.h2.twostep; - -import org.h2.api.*; -import org.h2.command.ddl.*; -import org.h2.engine.*; -import org.h2.index.*; -import org.h2.message.*; -import org.h2.result.*; -import org.h2.table.*; - -import java.util.*; - -/** - * Merge table for distributed queries. - */ -public class GridMergeTable extends TableBase { - /** */ - private final ArrayList<Index> idxs = new ArrayList<>(1); - - /** */ - private final GridMergeIndex idx; - - /** - * @param data Data. - */ - public GridMergeTable(CreateTableData data) { - super(data); - - idx = new GridMergeIndexUnsorted(this, "merge_scan"); - - idxs.add(idx); - } - - /** {@inheritDoc} */ - @Override public void lock(Session session, boolean exclusive, boolean force) { - // No-op. - } - - /** {@inheritDoc} */ - @Override public void close(Session ses) { - idx.close(ses); - } - - /** {@inheritDoc} */ - @Override public void unlock(Session s) { - // No-op. - } - - /** {@inheritDoc} */ - @Override public Index addIndex(Session session, String indexName, int indexId, IndexColumn[] cols, - IndexType indexType, boolean create, String indexComment) { - throw DbException.getUnsupportedException("addIndex"); - } - - /** {@inheritDoc} */ - @Override public void removeRow(Session session, Row row) { - throw DbException.getUnsupportedException("removeRow"); - } - - /** {@inheritDoc} */ - @Override public void truncate(Session session) { - throw DbException.getUnsupportedException("truncate"); - } - - /** {@inheritDoc} */ - @Override public void addRow(Session session, Row row) { - throw DbException.getUnsupportedException("addRow"); - } - - /** {@inheritDoc} */ - @Override public void checkSupportAlter() { - throw DbException.getUnsupportedException("alter"); - } - - /** {@inheritDoc} */ - @Override public String getTableType() { - return EXTERNAL_TABLE_ENGINE; - } - - /** {@inheritDoc} */ - @Override public GridMergeIndex getScanIndex(Session session) { - return idx; - } - - /** {@inheritDoc} */ - @Override public Index getUniqueIndex() { - return null; // We don't have a PK. - } - - /** {@inheritDoc} */ - @Override public ArrayList<Index> getIndexes() { - return idxs; - } - - /** {@inheritDoc} */ - @Override public boolean isLockedExclusively() { - return false; - } - - /** {@inheritDoc} */ - @Override public long getMaxDataModificationId() { - return 0; - } - - /** {@inheritDoc} */ - @Override public boolean isDeterministic() { - return true; - } - - /** {@inheritDoc} */ - @Override public boolean canGetRowCount() { - return true; - } - - /** {@inheritDoc} */ - @Override public boolean canDrop() { - return true; - } - - /** {@inheritDoc} */ - @Override public long getRowCount(Session ses) { - return idx.getRowCount(ses); - } - - /** {@inheritDoc} */ - @Override public long getRowCountApproximation() { - return idx.getRowCountApproximation(); - } - - /** {@inheritDoc} */ - @Override public long getDiskSpaceUsed() { - return 0; - } - - /** {@inheritDoc} */ - @Override public void checkRename() { - throw DbException.getUnsupportedException("rename"); - } - - /** - * Engine. - */ - public static class Engine implements TableEngine { - /** */ - private static ThreadLocal<GridMergeTable> createdTbl = new ThreadLocal<>(); - - /** - * @return Created table. - */ - public static GridMergeTable getCreated() { - GridMergeTable tbl = createdTbl.get(); - - assert tbl != null; - - createdTbl.remove(); - - return tbl; - } - - /** {@inheritDoc} */ - @Override public Table createTable(CreateTableData data) { - GridMergeTable tbl = new GridMergeTable(data); - - createdTbl.set(tbl); - - return tbl; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2851b52b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridReduceQueryExecutor.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridReduceQueryExecutor.java deleted file mode 100644 index 10691dc..0000000 --- a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridReduceQueryExecutor.java +++ /dev/null @@ -1,242 +0,0 @@ -/* @java.file.header */ - -/* _________ _____ __________________ _____ - * __ ____/___________(_)______ /__ ____/______ ____(_)_______ - * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ - * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / - * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ - */ - -package org.gridgain.grid.kernal.processors.query.h2.twostep; - -import org.apache.ignite.*; -import org.apache.ignite.cluster.*; -import org.apache.ignite.internal.*; -import org.apache.ignite.internal.processors.query.h2.*; -import org.apache.ignite.internal.util.future.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.lang.*; -import org.gridgain.grid.kernal.processors.cache.query.*; -import org.gridgain.grid.kernal.processors.query.h2.twostep.messages.*; -import org.jdk8.backport.*; - -import java.sql.*; -import java.util.*; -import java.util.concurrent.*; -import java.util.concurrent.atomic.*; - -/** - * Reduce query executor. - */ -public class GridReduceQueryExecutor { - /** */ - private GridKernalContext ctx; - - /** */ - private IgniteH2Indexing h2; - - /** */ - private IgniteLogger log; - - /** */ - private final AtomicLong reqIdGen = new AtomicLong(); - - /** */ - private final ConcurrentMap<Long, QueryRun> runs = new ConcurrentHashMap8<>(); - - /** - * @param ctx Context. - * @param h2 H2 Indexing. - * @throws IgniteCheckedException If failed. - */ - public void start(final GridKernalContext ctx, IgniteH2Indexing h2) throws IgniteCheckedException { - this.ctx = ctx; - this.h2 = h2; - - log = ctx.log(GridReduceQueryExecutor.class); - - // TODO handle node failure. - - ctx.io().addUserMessageListener(GridTopic.TOPIC_QUERY, new IgniteBiPredicate<UUID, Object>() { - @Override public boolean apply(UUID nodeId, Object msg) { - assert msg != null; - - ClusterNode node = ctx.discovery().node(nodeId); - - if (msg instanceof GridNextPageResponse) - onNextPage(node, (GridNextPageResponse)msg); - else if (msg instanceof GridQueryFailResponse) - onFail(node, (GridQueryFailResponse)msg); - - return true; - } - }); - } - - private void onFail(ClusterNode node, GridQueryFailResponse msg) { - U.error(log, "Failed to execute query.", msg.error()); - } - - private void onNextPage(final ClusterNode node, GridNextPageResponse msg) { - final long qryReqId = msg.queryRequestId(); - final int qry = msg.query(); - final int pageSize = msg.rows().size(); - - QueryRun r = runs.get(qryReqId); - - GridMergeIndex idx = r.tbls.get(msg.query()).getScanIndex(null); - - if (msg.allRows() != -1) { // Only the first page contains row count. - idx.addCount(msg.allRows()); - - r.latch.countDown(); - } - - idx.addPage(new GridResultPage<UUID>(node.id(), msg) { - @Override public void fetchNextPage() { - try { - ctx.io().sendUserMessage(F.asList(node), new GridNextPageRequest(qryReqId, qry, pageSize)); - } - catch (IgniteCheckedException e) { - throw new IgniteException(e); - } - } - }); - } - - /** - * @param space Space name. - * @param qry Query. - * @return Future. - */ - public IgniteFuture<GridCacheSqlResult> query(String space, GridCacheTwoStepQuery qry) { - long qryReqId = reqIdGen.incrementAndGet(); - - QueryRun r = new QueryRun(); - - r.tbls = new ArrayList<>(qry.mapQueries().size()); - - try { - r.conn = h2.connectionForSpace(space); - } - catch (IgniteCheckedException e) { - return new GridFinishedFutureEx<>(e); - } - - Collection<ClusterNode> nodes = ctx.grid().cluster().nodes(); // TODO filter nodes somehow? - - for (GridCacheSqlQuery mapQry : qry.mapQueries()) { - GridMergeTable tbl; - - try { - tbl = createTable(r.conn, mapQry); - } - catch (IgniteCheckedException e) { - return new GridFinishedFutureEx<>(e); - } - - tbl.getScanIndex(null).setNumberOfSources(nodes.size()); - - r.tbls.add(tbl); - } - - r.latch = new CountDownLatch(r.tbls.size() * nodes.size()); - - this.runs.put(qryReqId, r); - - try { - ctx.io().sendUserMessage(nodes, new GridQueryRequest(qryReqId, 1000, qry.mapQueries()), // TODO conf page size - GridTopic.TOPIC_QUERY, false, 0); - - r.latch.await(); - - GridCacheSqlQuery rdc = qry.reduceQuery(); - - final ResultSet res = h2.executeSqlQueryWithTimer(r.conn, rdc.query(), F.asList(rdc.parameters())); - - for (GridMergeTable tbl : r.tbls) - dropTable(r.conn, tbl.getName()); - - return new GridFinishedFuture(ctx, new Iter(res)); - } - catch (IgniteCheckedException | InterruptedException | SQLException e) { - U.closeQuiet(r.conn); - - return new GridFinishedFuture<>(ctx, e); - } - } - - /** - * @param conn Connection. - * @param tblName Table name. - * @throws SQLException If failed. - */ - private void dropTable(Connection conn, String tblName) throws SQLException { - try (Statement s = conn.createStatement()) { - s.execute("DROP TABLE " + tblName); - } - } - - /** - * @param conn Connection. - * @param qry Query. - * @return Table. - * @throws IgniteCheckedException If failed. - */ - private GridMergeTable createTable(Connection conn, GridCacheSqlQuery qry) throws IgniteCheckedException { - try { - try (PreparedStatement s = conn.prepareStatement( - "CREATE LOCAL TEMPORARY TABLE " + qry.alias() + - " ENGINE \"" + GridMergeTable.Engine.class.getName() + "\" " + - " AS SELECT * FROM (" + qry.query() + ") WHERE FALSE")) { - h2.bindParameters(s, F.asList(qry.parameters())); - - s.execute(); - } - - return GridMergeTable.Engine.getCreated(); - } - catch (SQLException e) { - U.closeQuiet(conn); - - throw new IgniteCheckedException(e); - } - } - - /** - * - */ - private static class QueryRun { - /** */ - private List<GridMergeTable> tbls; - - /** */ - private CountDownLatch latch; - - /** */ - private Connection conn; - } - - /** - * - */ - private static class Iter extends GridH2ResultSetIterator<List<?>> implements GridCacheSqlResult { - /** - * @param data Data array. - * @throws IgniteCheckedException If failed. - */ - protected Iter(ResultSet data) throws IgniteCheckedException { - super(data); - } - - /** {@inheritDoc} */ - @Override protected List<?> createRow() { - ArrayList<Object> res = new ArrayList<>(row.length); - - Collections.addAll(res, row); - - return res; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2851b52b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridResultPage.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridResultPage.java b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridResultPage.java deleted file mode 100644 index d400414..0000000 --- a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridResultPage.java +++ /dev/null @@ -1,59 +0,0 @@ -/* @java.file.header */ - -/* _________ _____ __________________ _____ - * __ ____/___________(_)______ /__ ____/______ ____(_)_______ - * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ - * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / - * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ - */ - -package org.gridgain.grid.kernal.processors.query.h2.twostep; - -import org.apache.ignite.internal.util.typedef.internal.*; -import org.gridgain.grid.kernal.processors.query.h2.twostep.messages.*; - -/** - * Page result. - */ -public class GridResultPage<Z> { - /** */ - private final Z src; - - /** */ - private final GridNextPageResponse res; - - /** - * @param src Source. - * @param res Response. - */ - protected GridResultPage(Z src, GridNextPageResponse res) { - this.src = src; - this.res = res; - } - - /** - * @return Result source. - */ - public Z source() { - return src; - } - - /** - * @return Response. - */ - public GridNextPageResponse response() { - return res; - } - - /** - * Request next page. - */ - public void fetchNextPage() { - throw new UnsupportedOperationException(); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(GridResultPage.class, this); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2851b52b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/messages/GridNextPageRequest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/messages/GridNextPageRequest.java b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/messages/GridNextPageRequest.java deleted file mode 100644 index e1eb905..0000000 --- a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/messages/GridNextPageRequest.java +++ /dev/null @@ -1,59 +0,0 @@ -/* @java.file.header */ - -/* _________ _____ __________________ _____ - * __ ____/___________(_)______ /__ ____/______ ____(_)_______ - * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ - * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / - * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ - */ - -package org.gridgain.grid.kernal.processors.query.h2.twostep.messages; - - -import java.io.*; - -/** - * Request to fetch next page. - */ -public class GridNextPageRequest implements Serializable { - /** */ - private long qryReqId; - - /** */ - private int qry; - - /** */ - private int pageSize; - - /** - * @param qryReqId Query request ID. - * @param qry Query. - * @param pageSize Page size. - */ - public GridNextPageRequest(long qryReqId, int qry, int pageSize) { - this.qryReqId = qryReqId; - this.qry = qry; - this.pageSize = pageSize; - } - - /** - * @return Query request ID. - */ - public long queryRequestId() { - return qryReqId; - } - - /** - * @return Query. - */ - public int query() { - return qry; - } - - /** - * @return Page size. - */ - public int pageSize() { - return pageSize; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2851b52b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/messages/GridNextPageResponse.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/messages/GridNextPageResponse.java b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/messages/GridNextPageResponse.java deleted file mode 100644 index 1bdd4a2..0000000 --- a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/messages/GridNextPageResponse.java +++ /dev/null @@ -1,180 +0,0 @@ -/* @java.file.header */ - -/* _________ _____ __________________ _____ - * __ ____/___________(_)______ /__ ____/______ ____(_)_______ - * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ - * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / - * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ - */ - -package org.gridgain.grid.kernal.processors.query.h2.twostep.messages; - -import org.apache.ignite.internal.util.typedef.internal.*; -import org.h2.store.*; -import org.h2.value.*; - -import java.io.*; -import java.util.*; - -/** - * Next page response. - */ -public class GridNextPageResponse implements Externalizable { - /** */ - private long qryReqId; - - /** */ - private int qry; - - /** */ - private int page; - - /** */ - private int allRows; - - /** */ - private Collection<Value[]> rows; - - /** */ - private boolean last; - - /** - * For {@link Externalizable}. - */ - public GridNextPageResponse() { - // No-op. - } - - /** - * @param qryReqId Query request ID. - * @param qry Query. - * @param page Page. - * @param allRows All rows count. - * @param last Last row. - * @param rows Rows. - */ - public GridNextPageResponse(long qryReqId, int qry, int page, int allRows, boolean last, Collection<Value[]> rows) { - assert rows != null; - - this.qryReqId = qryReqId; - this.qry = qry; - this.page = page; - this.allRows = allRows; - this.last = last; - this.rows = rows; - } - - /** - * @return Query request ID. - */ - public long queryRequestId() { - return qryReqId; - } - - /** - * @return Query. - */ - public int query() { - return qry; - } - - /** - * @return Page. - */ - public int page() { - return page; - } - - /** - * @return All rows. - */ - public int allRows() { - return allRows; - } - - /** - * @return {@code true} If this is the last page. - */ - public boolean isLast() { - return last; - } - - /** - * @return Rows. - */ - public Collection<Value[]> rows() { - return rows; - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - out.writeLong(qryReqId); - out.writeInt(qry); - out.writeInt(page); - out.writeBoolean(last); - out.writeInt(allRows); - - out.writeInt(rows.size()); - - if (rows.isEmpty()) - return; - - Data data = Data.create(null, 512); - - boolean first = true; - - for (Value[] row : rows) { - if (first) { - out.writeInt(row.length); - - first = false; - } - - for (Value val : row) - data.writeValue(val); - } - - out.writeInt(data.length()); - out.write(data.getBytes(), 0, data.length()); - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - qryReqId = in.readLong(); - qry = in.readInt(); - page = in.readInt(); - last = in.readBoolean(); - allRows = in.readInt(); - - int rowCnt = in.readInt(); - - if (rowCnt == 0) - rows = Collections.emptyList(); - else { - rows = new ArrayList<>(rowCnt); - - int cols = in.readInt(); - int dataSize = in.readInt(); - - byte[] dataBytes = new byte[dataSize]; - - in.readFully(dataBytes); - - Data data = Data.create(null, dataBytes); - - for (int r = 0; r < rowCnt; r++) { - Value[] row = new Value[cols]; - - for (int c = 0; c < cols; c++) - row[c] = data.readValue(); - - rows.add(row); - } - } - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(GridNextPageResponse.class, this); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2851b52b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/messages/GridQueryAck.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/messages/GridQueryAck.java b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/messages/GridQueryAck.java deleted file mode 100644 index fe55114..0000000 --- a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/messages/GridQueryAck.java +++ /dev/null @@ -1,34 +0,0 @@ -/* @java.file.header */ - -/* _________ _____ __________________ _____ - * __ ____/___________(_)______ /__ ____/______ ____(_)_______ - * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ - * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / - * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ - */ - -package org.gridgain.grid.kernal.processors.query.h2.twostep.messages; - -import java.io.*; - -/** - * TODO write doc - */ -public class GridQueryAck implements Serializable { - /** */ - private long reqId; - - /** - * @param reqId Request ID. - */ - public GridQueryAck(long reqId) { - this.reqId = reqId; - } - - /** - * @return Request ID. - */ - public long requestId() { - return reqId; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2851b52b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/messages/GridQueryFailResponse.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/messages/GridQueryFailResponse.java b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/messages/GridQueryFailResponse.java deleted file mode 100644 index ba5855e..0000000 --- a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/messages/GridQueryFailResponse.java +++ /dev/null @@ -1,46 +0,0 @@ -/* @java.file.header */ - -/* _________ _____ __________________ _____ - * __ ____/___________(_)______ /__ ____/______ ____(_)_______ - * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ - * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / - * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ - */ - -package org.gridgain.grid.kernal.processors.query.h2.twostep.messages; - -import java.io.*; - -/** - * Error message. - */ -public class GridQueryFailResponse implements Serializable { - /** */ - private long qryReqId; - - /** */ - private Throwable err; - - /** - * @param qryReqId Query request ID. - * @param err Error. - */ - public GridQueryFailResponse(long qryReqId, Throwable err) { - this.qryReqId = qryReqId; - this.err = err; - } - - /** - * @return Query request ID. - */ - public long queryRequestId() { - return qryReqId; - } - - /** - * @return Error. - */ - public Throwable error() { - return err; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2851b52b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/messages/GridQueryRequest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/messages/GridQueryRequest.java b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/messages/GridQueryRequest.java deleted file mode 100644 index 7e2e9ad..0000000 --- a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/messages/GridQueryRequest.java +++ /dev/null @@ -1,61 +0,0 @@ -/* @java.file.header */ - -/* _________ _____ __________________ _____ - * __ ____/___________(_)______ /__ ____/______ ____(_)_______ - * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ - * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / - * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ - */ - -package org.gridgain.grid.kernal.processors.query.h2.twostep.messages; - -import org.gridgain.grid.kernal.processors.cache.query.*; - -import java.io.*; -import java.util.*; - -/** - * Query request. - */ -public class GridQueryRequest implements Serializable { - /** */ - private long reqId; - - /** */ - private int pageSize; - - /** */ - private Collection<GridCacheSqlQuery> qrys; - - /** - * @param reqId Request ID. - * @param pageSize Page size. - * @param qrys Queries. - */ - public GridQueryRequest(long reqId, int pageSize, Collection<GridCacheSqlQuery> qrys) { - this.reqId = reqId; - this.pageSize = pageSize; - this.qrys = qrys; - } - - /** - * @return Request ID. - */ - public long requestId() { - return reqId; - } - - /** - * @return Page size. - */ - public int pageSize() { - return pageSize; - } - - /** - * @return Queries. - */ - public Collection<GridCacheSqlQuery> queries() { - return qrys; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2851b52b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/sql/GridQueryParsingTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/sql/GridQueryParsingTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/sql/GridQueryParsingTest.java index afbfa28..accb965 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/sql/GridQueryParsingTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/sql/GridQueryParsingTest.java @@ -16,7 +16,6 @@ import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.query.*; import org.apache.ignite.internal.processors.query.h2.*; import org.apache.ignite.internal.util.typedef.internal.*; -import org.gridgain.grid.kernal.processors.query.h2.sql.*; import org.h2.command.*; import org.h2.command.dml.*; import org.h2.engine.*;