ignite-1142 - fake thread local tables
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/ab5c7e41 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/ab5c7e41 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/ab5c7e41 Branch: refs/heads/ignite-1142 Commit: ab5c7e4116cbdfb13b08cd3f4bafbc1ffa184926 Parents: cfd1fb2 Author: S.Vladykin <svlady...@gridgain.com> Authored: Fri Jul 31 02:15:47 2015 +0300 Committer: S.Vladykin <svlady...@gridgain.com> Committed: Fri Jul 31 02:15:47 2015 +0300 ---------------------------------------------------------------------- .../cache/query/GridCacheSqlQuery.java | 18 +- .../cache/query/GridCacheTwoStepQuery.java | 19 +- .../processors/query/h2/IgniteH2Indexing.java | 13 +- .../query/h2/sql/GridSqlQuerySplitter.java | 36 +-- .../query/h2/twostep/GridMergeTable.java | 31 --- .../h2/twostep/GridReduceQueryExecutor.java | 214 +++++---------- .../query/h2/twostep/GridThreadLocalTable.java | 262 +++++++++++++++++++ .../IgniteCacheAbstractFieldsQuerySelfTest.java | 2 +- 8 files changed, 354 insertions(+), 241 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ab5c7e41/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheSqlQuery.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheSqlQuery.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheSqlQuery.java index 256fd7c..d5eb379 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheSqlQuery.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheSqlQuery.java @@ -39,9 +39,6 @@ public class GridCacheSqlQuery implements Message { public static final Object[] EMPTY_PARAMS = {}; /** */ - private String alias; - - /** */ @GridToStringInclude private String qry; @@ -66,14 +63,12 @@ public class GridCacheSqlQuery implements Message { } /** - * @param alias Alias. * @param qry Query. * @param params Query parameters. */ - public GridCacheSqlQuery(String alias, String qry, Object[] params) { + public GridCacheSqlQuery(String qry, Object[] params) { A.ensure(!F.isEmpty(qry), "qry must not be empty"); - this.alias = alias; this.qry = qry; this.params = F.isEmpty(params) ? EMPTY_PARAMS : params; @@ -97,13 +92,6 @@ public class GridCacheSqlQuery implements Message { } /** - * @return Alias. - */ - public String alias() { - return alias; - } - - /** * @return Query. */ public String query() { @@ -161,7 +149,7 @@ public class GridCacheSqlQuery implements Message { switch (writer.state()) { case 0: - if (!writer.writeString("alias", alias)) + if (!writer.writeString("alias", null)) return false; writer.incrementState(); @@ -192,7 +180,7 @@ public class GridCacheSqlQuery implements Message { switch (reader.state()) { case 0: - alias = reader.readString("alias"); + reader.readString("alias"); if (!reader.isLastRead()) return false; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ab5c7e41/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java index 8613df8..83a79e3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java @@ -17,10 +17,7 @@ package org.apache.ignite.internal.processors.cache.query; -import org.apache.ignite.*; -import org.apache.ignite.internal.util.*; import org.apache.ignite.internal.util.tostring.*; -import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; import java.util.*; @@ -34,7 +31,7 @@ public class GridCacheTwoStepQuery { /** */ @GridToStringInclude - private Map<String, GridCacheSqlQuery> mapQrys; + private List<GridCacheSqlQuery> mapQrys = new ArrayList<>(); /** */ @GridToStringInclude @@ -93,15 +90,7 @@ public class GridCacheTwoStepQuery { * @param qry SQL Query. */ public void addMapQuery(GridCacheSqlQuery qry) { - String alias = qry.alias(); - - A.ensure(!F.isEmpty(alias), "alias must not be empty"); - - if (mapQrys == null) - mapQrys = new GridLeanMap<>(); - - if (mapQrys.put(alias, qry) != null) - throw new IgniteException("Failed to add query, alias already exists: " + alias + "."); + mapQrys.add(qry); } /** @@ -114,8 +103,8 @@ public class GridCacheTwoStepQuery { /** * @return Map queries. */ - public Collection<GridCacheSqlQuery> mapQueries() { - return mapQrys.values(); + public List<GridCacheSqlQuery> mapQueries() { + return mapQrys; } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ab5c7e41/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java index df6ac49..dc61d76 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java @@ -1329,14 +1329,20 @@ public class IgniteH2Indexing implements GridQueryIndexing { } } - executeStatement("INFORMATION_SCHEMA", "SHUTDOWN"); - for (Connection c : conns) U.close(c, log); conns.clear(); schemas.clear(); + try (Connection c = DriverManager.getConnection(dbUrl); + Statement s = c.createStatement()) { + s.execute("SHUTDOWN"); + } + catch (SQLException e) { + U.error(log, "Failed to shutdown database.", e); + } + if (log.isDebugEnabled()) log.debug("Cache query index stopped."); } @@ -1352,9 +1358,6 @@ public class IgniteH2Indexing implements GridQueryIndexing { createSchema(schema); - executeStatement(schema, "CREATE ALIAS " + GridSqlQuerySplitter.TABLE_FUNC_NAME + - " NOBUFFER FOR \"" + GridReduceQueryExecutor.class.getName() + ".mergeTableFunction\""); - createSqlFunctions(schema, ccfg.getSqlFunctionClasses()); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ab5c7e41/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java index 9326b01..2f8bcdd 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java @@ -22,7 +22,6 @@ import org.apache.ignite.internal.processors.cache.query.*; import org.apache.ignite.internal.processors.query.h2.*; import org.apache.ignite.internal.util.typedef.*; import org.h2.jdbc.*; -import org.h2.value.*; import org.jetbrains.annotations.*; import java.util.*; @@ -35,20 +34,20 @@ import static org.apache.ignite.internal.processors.query.h2.sql.GridSqlPlacehol */ public class GridSqlQuerySplitter { /** */ - private static final String TABLE_PREFIX = "__T"; + private static final String TABLE_SCHEMA = "PUBLIC"; /** */ - private static final String COLUMN_PREFIX = "__C"; + private static final String TABLE_PREFIX = "__T"; /** */ - public static final String TABLE_FUNC_NAME = "__Z0"; + private static final String COLUMN_PREFIX = "__C"; /** * @param idx Index of table. - * @return Table name. + * @return Table. */ - private static String table(int idx) { - return TABLE_PREFIX + idx; + public static GridSqlTable table(int idx) { + return new GridSqlTable(TABLE_SCHEMA, TABLE_PREFIX + idx); } /** @@ -141,13 +140,11 @@ public class GridSqlQuerySplitter { // nullifying or updating things, have to make sure that we will not need them in the original form later. final GridSqlSelect mapQry = wrapUnion(collectAllSpaces(GridSqlQueryParser.parse(stmt), spaces)); - final String mergeTable = TABLE_FUNC_NAME + "()"; // table(0); TODO IGNITE-1142 - final boolean explain = mapQry.explain(); mapQry.explain(false); - GridSqlSelect rdcQry = new GridSqlSelect().from(new GridSqlFunction(null, TABLE_FUNC_NAME)); // table(mergeTable)); TODO IGNITE-1142 + GridSqlSelect rdcQry = new GridSqlSelect().from(table(0)); // Split all select expressions into map-reduce parts. List<GridSqlElement> mapExps = F.addAll(new ArrayList<GridSqlElement>(mapQry.allColumns()), @@ -218,10 +215,10 @@ public class GridSqlQuerySplitter { } // Build resulting two step query. - GridCacheTwoStepQuery res = new GridCacheTwoStepQuery(spaces, new GridCacheSqlQuery(null, rdcQry.getSQL(), + GridCacheTwoStepQuery res = new GridCacheTwoStepQuery(spaces, new GridCacheSqlQuery(rdcQry.getSQL(), findParams(rdcQry, params, new ArrayList<>()).toArray())); - res.addMapQuery(new GridCacheSqlQuery(mergeTable, mapQry.getSQL(), + res.addMapQuery(new GridCacheSqlQuery(mapQry.getSQL(), findParams(mapQry, params, new ArrayList<>(params.length)).toArray()) .columns(collectColumns(mapExps))); @@ -458,13 +455,6 @@ public class GridSqlQuerySplitter { if (idx < rdcSelect.length) { // SELECT __C0 AS original_alias GridSqlElement rdcEl = column(mapColAlias); - GridSqlType type = el.resultType(); - - assert type != null; - - if (type.type() == Value.UUID) // There is no JDBC type UUID, so conversion to bytes occurs. - rdcEl = function(CAST).resultType(GridSqlType.UUID).addChild(rdcEl); // TODO IGNITE-1142 - remove this cast when table function removed - if (colNames.add(rdcColAlias)) // To handle column name duplication (usually wildcard for few tables). rdcEl = alias(rdcColAlias, rdcEl); @@ -662,12 +652,4 @@ public class GridSqlQuerySplitter { private static GridSqlFunction function(GridSqlFunctionType type) { return new GridSqlFunction(type); } - - /** - * @param name Table name. - * @return Table. - */ - private static GridSqlTable table(String name) { - return new GridSqlTable(null, name); - } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ab5c7e41/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeTable.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeTable.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeTable.java index c9cdff2..26a92ae 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeTable.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeTable.java @@ -17,7 +17,6 @@ package org.apache.ignite.internal.processors.query.h2.twostep; -import org.h2.api.*; import org.h2.command.ddl.*; import org.h2.engine.*; import org.h2.index.*; @@ -153,34 +152,4 @@ public class GridMergeTable extends TableBase { @Override public void checkRename() { throw DbException.getUnsupportedException("rename"); } - - /** - * Engine. - */ - public static class Engine implements TableEngine { - /** */ - private static ThreadLocal<GridMergeTable> createdTbl = new ThreadLocal<>(); - - /** - * @return Created table. - */ - public static GridMergeTable getCreated() { - GridMergeTable tbl = createdTbl.get(); - - assert tbl != null; - - createdTbl.remove(); - - return tbl; - } - - /** {@inheritDoc} */ - @Override public Table createTable(CreateTableData data) { - GridMergeTable tbl = new GridMergeTable(data); - - createdTbl.set(tbl); - - return tbl; - } - } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ab5c7e41/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java index e34ddd6..6a988e1 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java @@ -39,12 +39,9 @@ import org.apache.ignite.marshaller.*; import org.apache.ignite.plugin.extensions.communication.*; import org.h2.command.ddl.*; import org.h2.engine.*; -import org.h2.expression.*; -import org.h2.index.*; import org.h2.jdbc.*; import org.h2.result.*; import org.h2.table.*; -import org.h2.tools.*; import org.h2.util.*; import org.h2.value.*; import org.jetbrains.annotations.*; @@ -56,6 +53,7 @@ import java.sql.*; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.*; +import java.util.concurrent.locks.*; import static org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion.*; @@ -79,7 +77,10 @@ public class GridReduceQueryExecutor { private final ConcurrentMap<Long, QueryRun> runs = new ConcurrentHashMap8<>(); /** */ - private static ThreadLocal<GridMergeTable> curFunTbl = new ThreadLocal<>(); + private volatile List<GridThreadLocalTable> fakeTbls = Collections.emptyList(); + + /** */ + private final Lock fakeTblsLock = new ReentrantLock(); /** */ private static final Constructor<JdbcResultSet> CONSTRUCTOR; @@ -462,11 +463,13 @@ public class GridReduceQueryExecutor { nodes = Collections.singleton(F.rand(nodes)); } + int tblIdx = 0; + for (GridCacheSqlQuery mapQry : qry.mapQueries()) { GridMergeTable tbl; try { - tbl = createFunctionTable(r.conn, mapQry, qry.explain()); // createTable(r.conn, mapQry); TODO + tbl = createMergeTable(r.conn, mapQry, qry.explain()); } catch (IgniteCheckedException e) { throw new IgniteException(e); @@ -479,7 +482,7 @@ public class GridReduceQueryExecutor { r.tbls.add(tbl); - curFunTbl.set(tbl); + fakeTable(r.conn, tblIdx++).setInnerTable(tbl); } r.latch = new CountDownLatch(r.tbls.size() * nodes.size()); @@ -499,7 +502,7 @@ public class GridReduceQueryExecutor { mapQrys = new ArrayList<>(qry.mapQueries().size()); for (GridCacheSqlQuery mapQry : qry.mapQueries()) - mapQrys.add(new GridCacheSqlQuery(mapQry.alias(), "EXPLAIN " + mapQry.query(), mapQry.parameters())); + mapQrys.add(new GridCacheSqlQuery("EXPLAIN " + mapQry.query(), mapQry.parameters())); } if (nodes.size() != 1 || !F.first(nodes).isLocal()) { // Marshall params for remotes. @@ -552,8 +555,6 @@ public class GridReduceQueryExecutor { for (GridMergeTable tbl : r.tbls) { if (!tbl.getScanIndex(null).fetchedAll()) // We have to explicitly cancel queries on remote nodes. send(nodes, new GridQueryCancelRequest(qryReqId), null); - -// dropTable(r.conn, tbl.getName()); TODO } if (retry) { @@ -587,12 +588,61 @@ public class GridReduceQueryExecutor { if (!runs.remove(qryReqId, r)) U.warn(log, "Query run was already removed: " + qryReqId); - curFunTbl.remove(); + for (int i = 0, mapQrys = qry.mapQueries().size(); i < mapQrys; i++) + fakeTable(null, i).setInnerTable(null); // Drop all merge tables. } } } /** + * @param idx Table index. + * @return Table name. + */ + private static String table(int idx) { + return GridSqlQuerySplitter.table(idx).getSQL(); + } + + /** + * Gets or creates new fake table for index. + * + * @param idx Index of table. + * @return Table. + */ + private GridThreadLocalTable fakeTable(Connection c, int idx) { + List<GridThreadLocalTable> tbls = fakeTbls; + + assert tbls.size() >= idx; + + if (tbls.size() == idx) { // If table for such index does not exist, create one. + fakeTblsLock.lock(); + + try { + if ((tbls = fakeTbls).size() == idx) { // Double check inside of lock. + try (Statement stmt = c.createStatement()) { + stmt.executeUpdate("CREATE TABLE " + table(idx) + + "(fake BOOL) ENGINE \"" + GridThreadLocalTable.Engine.class.getName() + '"'); + } + catch (SQLException e) { + throw new IllegalStateException(e); + } + + List<GridThreadLocalTable> newTbls = new ArrayList<>(tbls.size() + 1); + + newTbls.addAll(tbls); + newTbls.add(GridThreadLocalTable.Engine.getCreated()); + + fakeTbls = tbls = newTbls; + } + } + finally { + fakeTblsLock.unlock(); + } + } + + return tbls.get(idx); + } + + /** * Calculates data nodes for replicated caches on unstable topology. * * @param cctx Cache context for main space. @@ -825,16 +875,18 @@ public class GridReduceQueryExecutor { throws IgniteCheckedException { List<List<?>> lists = new ArrayList<>(); - for (GridCacheSqlQuery mapQry : qry.mapQueries()) { - ResultSet rs = h2.executeSqlQueryWithTimer(space, c, "SELECT PLAN FROM " + mapQry.alias(), null); + for (int i = 0, mapQrys = qry.mapQueries().size(); i < mapQrys; i++) { + ResultSet rs = h2.executeSqlQueryWithTimer(space, c, "SELECT PLAN FROM " + table(i), null); lists.add(F.asList(getPlan(rs))); } + int tblIdx = 0; + for (GridCacheSqlQuery mapQry : qry.mapQueries()) { - GridMergeTable tbl = createFunctionTable(c, mapQry, false); + GridMergeTable tbl = createMergeTable(c, mapQry, false); - curFunTbl.set(tbl); // Now it will be only a single table. + fakeTable(c, tblIdx++).setInnerTable(tbl); } GridCacheSqlQuery rdc = qry.reduceQuery(); @@ -928,118 +980,12 @@ public class GridReduceQueryExecutor { /** * @param conn Connection. - * @param tblName Table name. - * @throws SQLException If failed. - */ - private void dropTable(Connection conn, String tblName) throws SQLException { - try (Statement s = conn.createStatement()) { - s.execute("DROP TABLE " + tblName); - } - } - - /** - * @return Merged result set. - */ - public static ResultSet mergeTableFunction(JdbcConnection c) throws Exception { - GridMergeTable tbl = curFunTbl.get(); - - Session ses = (Session)c.getSession(); - - String url = c.getMetaData().getURL(); - - // URL is either "jdbc:default:connection" or "jdbc:columnlist:connection" - final Cursor cursor = url.charAt(5) == 'c' ? null : tbl.getScanIndex(ses).find(ses, null, null); - - final Column[] cols = tbl.getColumns(); - - SimpleResultSet rs = new SimpleResultSet(cursor == null ? null : new SimpleRowSource() { - @Override public Object[] readRow() throws SQLException { - if (!cursor.next()) - return null; - - Row r = cursor.get(); - - Object[] row = new Object[cols.length]; - - for (int i = 0; i < row.length; i++) - row[i] = r.getValue(i).getObject(); - - return row; - } - - @Override public void close() { - // No-op. - } - - @Override public void reset() throws SQLException { - throw new SQLException("Unsupported."); - } - }) { - @Override public byte[] getBytes(int colIdx) throws SQLException { - assert cursor != null; - - return cursor.get().getValue(colIdx - 1).getBytes(); - } - - @Override public <T> T getObject(int columnIndex, Class<T> type) throws SQLException { - throw new UnsupportedOperationException(); - } - - @Override public <T> T getObject(String columnLabel, Class<T> type) throws SQLException { - throw new UnsupportedOperationException(); - } - }; - - for (Column col : cols) - rs.addColumn(col.getName(), DataType.convertTypeToSQLType(col.getType()), - MathUtils.convertLongToInt(col.getPrecision()), col.getScale()); - - return rs; - } - - /** - * @param asQuery Query. - * @return List of columns. - */ - private static ArrayList<Column> generateColumnsFromQuery(org.h2.command.dml.Query asQuery) { - int columnCount = asQuery.getColumnCount(); - ArrayList<Expression> expressions = asQuery.getExpressions(); - ArrayList<Column> cols = new ArrayList<>(); - for (int i = 0; i < columnCount; i++) { - Expression expr = expressions.get(i); - int type = expr.getType(); - String name = expr.getAlias(); - long precision = expr.getPrecision(); - int displaySize = expr.getDisplaySize(); - DataType dt = DataType.getDataType(type); - if (precision > 0 && (dt.defaultPrecision == 0 || - (dt.defaultPrecision > precision && dt.defaultPrecision < Byte.MAX_VALUE))) { - // dont' set precision to MAX_VALUE if this is the default - precision = dt.defaultPrecision; - } - int scale = expr.getScale(); - if (scale > 0 && (dt.defaultScale == 0 || - (dt.defaultScale > scale && dt.defaultScale < precision))) { - scale = dt.defaultScale; - } - if (scale > precision) { - precision = scale; - } - Column col = new Column(name, type, precision, scale, displaySize); - cols.add(col); - } - - return cols; - } - - /** - * @param conn Connection. * @param qry Query. * @param explain Explain. * @return Table. * @throws IgniteCheckedException */ - private GridMergeTable createFunctionTable(JdbcConnection conn, GridCacheSqlQuery qry, boolean explain) + private GridMergeTable createMergeTable(JdbcConnection conn, GridCacheSqlQuery qry, boolean explain) throws IgniteCheckedException { try { Session ses = (Session)conn.getSession(); @@ -1094,32 +1040,6 @@ public class GridReduceQueryExecutor { } /** - * @param conn Connection. - * @param qry Query. - * @return Table. - * @throws IgniteCheckedException If failed. - */ - private GridMergeTable createTable(Connection conn, GridCacheSqlQuery qry) throws IgniteCheckedException { - try { - try (PreparedStatement s = conn.prepareStatement( - "CREATE LOCAL TEMPORARY TABLE " + qry.alias() + - " ENGINE \"" + GridMergeTable.Engine.class.getName() + "\" " + - " AS SELECT * FROM (" + qry.query() + ") WHERE FALSE")) { - h2.bindParameters(s, F.asList(qry.parameters())); - - s.execute(); - } - - return GridMergeTable.Engine.getCreated(); - } - catch (SQLException e) { - U.closeQuiet(conn); - - throw new IgniteCheckedException(e); - } - } - - /** * @param reconnectFut Reconnect future. */ public void onDisconnected(IgniteFuture<?> reconnectFut) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ab5c7e41/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridThreadLocalTable.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridThreadLocalTable.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridThreadLocalTable.java new file mode 100644 index 0000000..c468371 --- /dev/null +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridThreadLocalTable.java @@ -0,0 +1,262 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.h2.twostep; + +import org.h2.api.*; +import org.h2.command.ddl.*; +import org.h2.engine.*; +import org.h2.index.*; +import org.h2.result.*; +import org.h2.schema.*; +import org.h2.table.*; +import org.h2.value.*; + +import java.util.*; + +/** + * Thread local table wrapper for another table instance. + */ +public class GridThreadLocalTable extends Table { + /** Delegate table */ + private final ThreadLocal<Table> tbl = new ThreadLocal<>(); + + /** + * @param schema Schema. + * @param id ID. + * @param name Table name. + * @param persistIndexes Persist indexes. + * @param persistData Persist data. + */ + public GridThreadLocalTable(Schema schema, int id, String name, boolean persistIndexes, boolean persistData) { + super(schema, id, name, persistIndexes, persistData); + } + + /** + * @param t Table or {@code null} to reset existing. + */ + public void setInnerTable(Table t) { + if (t == null) + tbl.remove(); + else + tbl.set(t); + } + + /** {@inheritDoc} */ + @Override public Index getPrimaryKey() { + return tbl.get().getPrimaryKey(); + } + + /** {@inheritDoc} */ + @Override public Column getRowIdColumn() { + return tbl.get().getRowIdColumn(); + } + + /** {@inheritDoc} */ + @Override public PlanItem getBestPlanItem(Session session, int[] masks, TableFilter filter, SortOrder sortOrder) { + return tbl.get().getBestPlanItem(session, masks, filter, sortOrder); + } + + /** {@inheritDoc} */ + @Override public Value getDefaultValue(Session session, Column column) { + return tbl.get().getDefaultValue(session, column); + } + + /** {@inheritDoc} */ + @Override public SearchRow getTemplateSimpleRow(boolean singleColumn) { + return tbl.get().getTemplateSimpleRow(singleColumn); + } + + /** {@inheritDoc} */ + @Override public Row getTemplateRow() { + return tbl.get().getTemplateRow(); + } + + /** {@inheritDoc} */ + @Override public Column getColumn(String columnName) { + return tbl.get().getColumn(columnName); + } + + /** {@inheritDoc} */ + @Override public Column getColumn(int index) { + return tbl.get().getColumn(index); + } + + /** {@inheritDoc} */ + @Override public Index getIndexForColumn(Column column) { + return tbl.get().getIndexForColumn(column); + } + + /** {@inheritDoc} */ + @Override public Column[] getColumns() { + return tbl.get().getColumns(); + } + + /** {@inheritDoc} */ + @Override protected void setColumns(Column[] columns) { + throw new IllegalStateException("Cols: " + Arrays.asList(columns)); + } + + /** {@inheritDoc} */ + @Override public void lock(Session session, boolean exclusive, boolean force) { + tbl.get().lock(session, exclusive, force); + } + + /** {@inheritDoc} */ + @Override public void close(Session session) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void unlock(Session s) { + tbl.get().unlock(s); + } + + /** {@inheritDoc} */ + @Override public Index addIndex(Session session, String indexName, int indexId, IndexColumn[] cols, + IndexType indexType, boolean create, String indexComment) { + return tbl.get().addIndex(session, indexName, indexId, cols, indexType, create, indexComment); + } + + /** {@inheritDoc} */ + @Override public void removeRow(Session session, Row row) { + tbl.get().removeRow(session, row); + } + + /** {@inheritDoc} */ + @Override public void truncate(Session session) { + tbl.get().truncate(session); + } + + /** {@inheritDoc} */ + @Override public void addRow(Session session, Row row) { + tbl.get().addRow(session, row); + } + + /** {@inheritDoc} */ + @Override public void checkSupportAlter() { + tbl.get().checkSupportAlter(); + } + + /** {@inheritDoc} */ + @Override public String getTableType() { + return tbl.get().getTableType(); + } + + /** {@inheritDoc} */ + @Override public Index getUniqueIndex() { + return tbl.get().getUniqueIndex(); + } + + /** {@inheritDoc} */ + @Override public Index getScanIndex(Session session) { + return tbl.get().getScanIndex(session); + } + + /** {@inheritDoc} */ + @Override public ArrayList<Index> getIndexes() { + return tbl.get().getIndexes(); + } + + /** {@inheritDoc} */ + @Override public boolean isLockedExclusively() { + return tbl.get().isLockedExclusively(); + } + + /** {@inheritDoc} */ + @Override public long getMaxDataModificationId() { + return tbl.get().getMaxDataModificationId(); + } + + /** {@inheritDoc} */ + @Override public boolean isDeterministic() { + return tbl.get().isDeterministic(); + } + + /** {@inheritDoc} */ + @Override public boolean canGetRowCount() { + return tbl.get().canGetRowCount(); + } + + /** {@inheritDoc} */ + @Override public boolean canDrop() { + return tbl.get().canDrop(); + } + + /** {@inheritDoc} */ + @Override public long getRowCount(Session session) { + return tbl.get().getRowCount(session); + } + + /** {@inheritDoc} */ + @Override public long getRowCountApproximation() { + return tbl.get().getRowCountApproximation(); + } + + /** {@inheritDoc} */ + @Override public long getDiskSpaceUsed() { + return tbl.get().getDiskSpaceUsed(); + } + + /** {@inheritDoc} */ + @Override public String getCreateSQL() { + return ""; + } + + /** {@inheritDoc} */ + @Override public String getDropSQL() { + return tbl.get().getDropSQL(); + } + + /** {@inheritDoc} */ + @Override public void checkRename() { + tbl.get().checkRename(); + } + + /** + * Engine. + */ + public static class Engine implements TableEngine { + /** */ + private static ThreadLocal<GridThreadLocalTable> createdTbl = new ThreadLocal<>(); + + /** + * @return Created table. + */ + public static GridThreadLocalTable getCreated() { + GridThreadLocalTable tbl = createdTbl.get(); + + assert tbl != null; + + createdTbl.remove(); + + return tbl; + } + + /** {@inheritDoc} */ + @Override public Table createTable(CreateTableData d) { + assert createdTbl.get() == null; + + GridThreadLocalTable tbl = new GridThreadLocalTable(d.schema, d.id, d.tableName, d.persistIndexes, + d.persistData); + + createdTbl.set(tbl); + + return tbl; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ab5c7e41/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractFieldsQuerySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractFieldsQuerySelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractFieldsQuerySelfTest.java index ccb3115..18bfd57 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractFieldsQuerySelfTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractFieldsQuerySelfTest.java @@ -316,7 +316,7 @@ public abstract class IgniteCacheAbstractFieldsQuerySelfTest extends GridCommonA if (cacheMode() == PARTITIONED) { assertEquals(2, res.size()); - assertTrue(((String)res.get(1).get(0)).contains(GridSqlQuerySplitter.TABLE_FUNC_NAME)); + assertTrue(((String)res.get(1).get(0)).contains(GridSqlQuerySplitter.table(0).getSQL())); } else assertEquals(1, res.size());