http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/922a2bab/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/GridH2Indexing.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/GridH2Indexing.java b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/GridH2Indexing.java new file mode 100644 index 0000000..3a850cc --- /dev/null +++ b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/GridH2Indexing.java @@ -0,0 +1,1998 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ +* __ ____/___________(_)______ /__ ____/______ ____(_)_______ +* _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ +* / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / +* \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ +*/ + +package org.gridgain.grid.kernal.processors.query.h2; + +import org.apache.ignite.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.marshaller.*; +import org.apache.ignite.resources.*; +import org.apache.ignite.spi.*; +import org.apache.ignite.spi.indexing.*; +import org.gridgain.grid.*; +import org.gridgain.grid.cache.*; +import org.gridgain.grid.cache.query.*; +import org.gridgain.grid.kernal.*; +import org.gridgain.grid.kernal.processors.cache.*; +import org.gridgain.grid.kernal.processors.query.*; +import org.gridgain.grid.kernal.processors.query.h2.opt.*; +import org.gridgain.grid.util.*; +import org.gridgain.grid.util.lang.*; +import org.gridgain.grid.util.offheap.unsafe.*; +import org.gridgain.grid.util.typedef.*; +import org.gridgain.grid.util.typedef.internal.*; +import org.h2.api.*; +import org.h2.command.*; +import org.h2.constant.*; +import org.h2.index.*; +import org.h2.jdbc.*; +import org.h2.message.*; +import org.h2.mvstore.cache.*; +import org.h2.server.web.*; +import org.h2.table.*; +import org.h2.tools.*; +import org.h2.util.*; +import org.h2.value.*; +import org.jdk8.backport.*; +import org.jetbrains.annotations.*; + +import java.io.*; +import java.lang.reflect.*; +import java.math.*; +import java.sql.*; +import java.text.*; +import java.util.*; +import java.util.concurrent.*; + +import static org.apache.ignite.IgniteSystemProperties.*; +import static org.gridgain.grid.kernal.processors.query.GridQueryIndexType.*; +import static org.gridgain.grid.kernal.processors.query.h2.opt.GridH2AbstractKeyValueRow.*; +import static org.h2.result.SortOrder.*; + +/** + * Indexing implementation based on H2 database engine. In this implementation main query language is SQL, + * fulltext indexing can be performed using Lucene. For each registered space + * the SPI will create respective schema, for default space (where space name is null) schema + * with name {@code PUBLIC} will be used. To avoid name conflicts user should not explicitly name + * a schema {@code PUBLIC}. + * <p> + * For each registered {@link GridQueryTypeDescriptor} this SPI will create respective SQL table with + * {@code '_key'} and {@code '_val'} fields for key and value, and fields from + * {@link GridQueryTypeDescriptor#keyFields()} and {@link GridQueryTypeDescriptor#valueFields()}. + * For each table it will create indexes declared in {@link GridQueryTypeDescriptor#indexes()}. + * <h1 class="header">Some important defaults.</h1> + * <ul> + * <li>All the data will be kept in memory</li> + * <li>Primitive types will not be indexed (e.g. java types which can be directly converted to SQL types)</li> + * <li> + * Key types will be converted to SQL types, so it is impossible to store one value type with + * different key types + * </li> + * </ul> + * @see GridIndexingSpi + */ +@SuppressWarnings({"UnnecessaryFullyQualifiedName", "NonFinalStaticVariableUsedInClassInitialization"}) +public class GridH2Indexing implements GridQueryIndexing { + /** Default DB options. */ + private static final String DFLT_DB_OPTIONS = ";LOCK_MODE=3;MULTI_THREADED=1;DB_CLOSE_ON_EXIT=FALSE" + + ";DEFAULT_LOCK_TIMEOUT=10000"; + + /** Options for optimized mode to work properly. */ + private static final String OPTIMIZED_DB_OPTIONS = ";OPTIMIZE_REUSE_RESULTS=0;QUERY_CACHE_SIZE=0;" + + "RECOMPILE_ALWAYS=1;MAX_OPERATION_MEMORY=0"; + + /** Field name for key. */ + public static final String KEY_FIELD_NAME = "_key"; + + /** Field name for value. */ + public static final String VAL_FIELD_NAME = "_val"; + + /** */ + private static final Field COMMAND_FIELD; + + /** + * Command in H2 prepared statement. + */ + static { + try { + COMMAND_FIELD = JdbcPreparedStatement.class.getDeclaredField("command"); + + COMMAND_FIELD.setAccessible(true); + } + catch (NoSuchFieldException e) { + throw new IllegalStateException("Check H2 version in classpath.", e); + } + } + + /** */ + private static final ThreadLocal<GridH2Indexing> localSpi = new ThreadLocal<>(); + + /** */ + private volatile String cachedSearchPathCmd; + + /** Cache for deserialized offheap rows. */ + private CacheLongKeyLIRS<GridH2KeyValueRowOffheap> rowCache = new CacheLongKeyLIRS<>(32 * 1024, 1, 128, 256); + + /** Logger. */ + @IgniteLoggerResource + private IgniteLogger log; + + /** Node ID. */ + @IgniteLocalNodeIdResource + private UUID nodeId; + + /** */ + @IgniteMarshallerResource + private IgniteMarshaller marshaller; + + /** */ + private GridUnsafeMemory offheap; + + /** */ + private final Collection<String> schemaNames = new GridConcurrentHashSet<>(); + + /** Collection of schemaNames and registered tables. */ + private final ConcurrentMap<String, Schema> schemas = new ConcurrentHashMap8<>(); + + /** */ + private String dbUrl = "jdbc:h2:mem:"; + + /** */ + private final Collection<Connection> conns = Collections.synchronizedCollection(new ArrayList<Connection>()); + + /** */ + private final ThreadLocal<ConnectionWrapper> connCache = new ThreadLocal<ConnectionWrapper>() { + @Nullable @Override public ConnectionWrapper get() { + ConnectionWrapper c = super.get(); + + boolean reconnect = true; + + try { + reconnect = c == null || c.connection().isClosed(); + } + catch (SQLException e) { + U.warn(log, "Failed to check connection status.", e); + } + + if (reconnect) { + c = initialValue(); + + set(c); + } + + return c; + } + + @Nullable @Override protected ConnectionWrapper initialValue() { + Connection c = null; + + try { + c = DriverManager.getConnection(dbUrl); + + String[] searchPath = cfg.getSearchPath(); + + if (!F.isEmpty(searchPath)) { + try (Statement s = c.createStatement()) { + String cmd = cachedSearchPathCmd; + + if (cmd == null) { + SB b = new SB("SET SCHEMA_SEARCH_PATH "); + + for (int i = 0; i < searchPath.length; i++) { + if (i != 0) + b.a(','); + + b.a('"').a(schema(searchPath[i])).a('"'); + } + + cachedSearchPathCmd = cmd = b.toString(); + } + + s.executeUpdate(cmd); + } + } + + conns.add(c); + + return new ConnectionWrapper(c); + } + catch (SQLException e) { + U.close(c, log); + + throw new GridRuntimeException("Failed to initialize DB connection: " + dbUrl, e); + } + } + }; + + /** */ + private volatile GridQueryConfiguration cfg = new GridQueryConfiguration(); + + /** */ + private volatile GridKernalContext ctx; + + /** + * Gets DB connection. + * + * @param schema Whether to set schema for connection or not. + * @return DB connection. + * @throws GridException In case of error. + */ + private Connection connectionForThread(@Nullable String schema) throws GridException { + ConnectionWrapper c = connCache.get(); + + if (c == null) + throw new GridException("Failed to get DB connection for thread (check log for details)."); + + if (schema != null && !F.eq(c.schema(), schema)) { + Statement stmt = null; + + try { + stmt = c.connection().createStatement(); + + stmt.executeUpdate("SET SCHEMA \"" + schema + '"'); + + if (log.isDebugEnabled()) + log.debug("Initialized H2 schema for queries on space: " + schema); + + c.schema(schema); + } + catch (SQLException e) { + throw new GridException("Failed to set schema for DB connection for thread [schema=" + + schema + "]", e); + } + finally { + U.close(stmt, log); + } + } + + return c.connection(); + } + + /** + * Creates DB schema if it has not been created yet. + * + * @param schema Schema name. + * @throws GridException If failed to create db schema. + */ + private void createSchemaIfAbsent(String schema) throws GridException { + executeStatement("CREATE SCHEMA IF NOT EXISTS \"" + schema + '"'); + + if (log.isDebugEnabled()) + log.debug("Created H2 schema for index database: " + schema); + } + + /** + * @param sql SQL statement. + * @throws GridException If failed. + */ + private void executeStatement(String sql) throws GridException { + Statement stmt = null; + + try { + Connection c = connectionForThread(null); + + stmt = c.createStatement(); + + stmt.executeUpdate(sql); + } + catch (SQLException e) { + onSqlException(); + + throw new GridException("Failed to execute statement: " + sql, e); + } + finally { + U.close(stmt, log); + } + } + + /** + * Removes entry with specified key from any tables (if exist). + * + * @param spaceName Space name. + * @param key Key. + * @param tblToUpdate Table to update. + * @throws GridException In case of error. + */ + private void removeKey(@Nullable String spaceName, Object key, TableDescriptor tblToUpdate) + throws GridException { + try { + Collection<TableDescriptor> tbls = tables(schema(spaceName)); + + if (tbls.size() > 1) { + boolean fixedTyping = isIndexFixedTyping(spaceName); + + for (TableDescriptor tbl : tbls) { + if (tbl != tblToUpdate && (tbl.type().keyClass().equals(key.getClass()) || + !fixedTyping)) { + if (tbl.tbl.update(key, null, 0)) { + if (tbl.luceneIdx != null) + tbl.luceneIdx.remove(key); + + return; + } + } + } + } + } + catch (Exception e) { + throw new GridException("Failed to remove key: " + key, e); + } + } + + /** + * Binds object to prepared statement. + * + * @param stmt SQL statement. + * @param idx Index. + * @param obj Value to store. + * @throws GridException If failed. + */ + private void bindObject(PreparedStatement stmt, int idx, @Nullable Object obj) throws GridException { + try { + if (obj == null) + stmt.setNull(idx, Types.VARCHAR); + else + stmt.setObject(idx, obj); + } + catch (SQLException e) { + throw new GridException("Failed to bind parameter [idx=" + idx + ", obj=" + obj + ']', e); + } + } + + /** + * Handles SQL exception. + */ + private void onSqlException() { + Connection conn = connCache.get().connection(); + + connCache.set(null); + + if (conn != null) { + conns.remove(conn); + + // Reset connection to receive new one at next call. + U.close(conn, log); + } + } + + /** {@inheritDoc} */ + @Override public void store(@Nullable String spaceName, GridQueryTypeDescriptor type, Object k, Object v, byte[] ver, + long expirationTime) throws GridException { + TableDescriptor tbl = tableDescriptor(spaceName, type); + + if (tbl == null) + return; // Type was rejected. + + localSpi.set(this); + + try { + removeKey(spaceName, k, tbl); + + if (expirationTime == 0) + expirationTime = Long.MAX_VALUE; + + tbl.tbl.update(k, v, expirationTime); + + if (tbl.luceneIdx != null) + tbl.luceneIdx.store(k, v, ver, expirationTime); + } + finally { + localSpi.remove(); + } + } + + /** {@inheritDoc} */ + @Override public void remove(@Nullable String spaceName, Object key) throws GridException { + if (log.isDebugEnabled()) + log.debug("Removing key from cache query index [locId=" + nodeId + ", key=" + key + ']'); + + localSpi.set(this); + + try { + for (TableDescriptor tbl : tables(schema(spaceName))) { + if (tbl.type().keyClass().equals(key.getClass()) || !isIndexFixedTyping(spaceName)) { + if (tbl.tbl.update(key, null, 0)) { + if (tbl.luceneIdx != null) + tbl.luceneIdx.remove(key); + + return; + } + } + } + } + finally { + localSpi.remove(); + } + } + + /** {@inheritDoc} */ + @Override public void onSwap(@Nullable String spaceName, Object key) throws GridException { + Schema schema = schemas.get(schema(spaceName)); + + if (schema == null) + return; + + localSpi.set(this); + + try { + for (TableDescriptor tbl : schema.values()) { + if (tbl.type().keyClass().equals(key.getClass()) || !isIndexFixedTyping(spaceName)) { + try { + if (tbl.tbl.onSwap(key)) + return; + } + catch (GridException e) { + throw new GridException(e); + } + } + } + } + finally { + localSpi.remove(); + } + } + + /** {@inheritDoc} */ + @Override public void onUnswap(@Nullable String spaceName, Object key, Object val, byte[] valBytes) + throws GridException { + localSpi.set(this); + + try { + for (TableDescriptor tbl : tables(schema(spaceName))) { + if (tbl.type().keyClass().equals(key.getClass()) || !isIndexFixedTyping(spaceName)) { + try { + if (tbl.tbl.onUnswap(key, val)) + return; + } + catch (GridException e) { + throw new GridException(e); + } + } + } + } + finally { + localSpi.remove(); + } + } + + /** + * Drops table form h2 database and clear all related indexes (h2 text, lucene). + * + * @param tbl Table to unregister. + * @throws GridException If failed to unregister. + */ + private void removeTable(TableDescriptor tbl) throws GridException { + assert tbl != null; + + if (log.isDebugEnabled()) + log.debug("Removing query index table: " + tbl.fullTableName()); + + Connection c = connectionForThread(null); + + Statement stmt = null; + + try { + // NOTE: there is no method dropIndex() for lucene engine correctly working. + // So we have to drop all lucene index. + // FullTextLucene.dropAll(c); TODO: GG-4015: fix this + + stmt = c.createStatement(); + + String sql = "DROP TABLE IF EXISTS " + tbl.fullTableName(); + + if (log.isDebugEnabled()) + log.debug("Dropping database index table with SQL: " + sql); + + stmt.executeUpdate(sql); + } + catch (SQLException e) { + onSqlException(); + + throw new GridException("Failed to drop database index table [type=" + tbl.type().name() + + ", table=" + tbl.fullTableName() + "]", e); + } + finally { + U.close(stmt, log); + } + + tbl.tbl.close(); + + if (tbl.luceneIdx != null) + U.closeQuiet(tbl.luceneIdx); + + ConcurrentMap<String, TableDescriptor> tbls = schemas.get(tbl.schema()); + + if (!F.isEmpty(tbls)) + tbls.remove(tbl.name()); + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public <K, V> GridCloseableIterator<IgniteBiTuple<K, V>> queryText( + @Nullable String spaceName, String qry, GridQueryTypeDescriptor type, + GridIndexingQueryFilter filters) throws GridException { + TableDescriptor tbl = tableDescriptor(spaceName, type); + + if (tbl != null && tbl.luceneIdx != null) + return tbl.luceneIdx.query(qry, filters); + + return new GridEmptyCloseableIterator<>(); + } + + /** {@inheritDoc} */ + @Override public void unregisterType(@Nullable String spaceName, GridQueryTypeDescriptor type) + throws GridException { + TableDescriptor tbl = tableDescriptor(spaceName, type); + + if (tbl != null) + removeTable(tbl); + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public <K, V> GridQueryFieldsResult queryFields(@Nullable final String spaceName, final String qry, + @Nullable final Collection<Object> params, final GridIndexingQueryFilter filters) + throws GridException { + localSpi.set(this); + + setFilters(filters); + + try { + Connection conn = connectionForThread(schema(spaceName)); + + ResultSet rs = executeSqlQueryWithTimer(conn, qry, params); + + List<GridQueryFieldMetadata> meta = null; + + if (rs != null) { + try { + ResultSetMetaData rsMeta = rs.getMetaData(); + + meta = new ArrayList<>(rsMeta.getColumnCount()); + + for (int i = 1; i <= rsMeta.getColumnCount(); i++) { + String schemaName = rsMeta.getSchemaName(i); + String typeName = rsMeta.getTableName(i); + String name = rsMeta.getColumnLabel(i); + String type = rsMeta.getColumnClassName(i); + + meta.add(new SqlFieldMetadata(schemaName, typeName, name, type)); + } + } + catch (SQLException e) { + throw new IgniteSpiException("Failed to get meta data.", e); + } + } + + return new GridQueryFieldsResultAdapter(meta, new FieldsIterator(rs)); + } + finally { + setFilters(null); + + localSpi.remove(); + } + } + + /** + * @param stmt Prepared statement. + * @return Command type. + */ + private static int commandType(PreparedStatement stmt) { + try { + return ((CommandInterface)COMMAND_FIELD.get(stmt)).getCommandType(); + } + catch (IllegalAccessException e) { + throw new IllegalStateException(e); + } + } + + /** + * @return Configuration. + */ + public GridQueryConfiguration configuration() { + return cfg; + } + + /** + * Executes sql query. + * + * @param conn Connection,. + * @param sql Sql query. + * @param params Parameters. + * @return Result. + * @throws GridException If failed. + */ + @Nullable private ResultSet executeSqlQuery(Connection conn, String sql, + @Nullable Collection<Object> params) throws GridException { + PreparedStatement stmt; + + try { + stmt = conn.prepareStatement(sql); + } + catch (SQLException e) { + if (e.getErrorCode() == ErrorCode.TABLE_OR_VIEW_NOT_FOUND_1) + return null; + + throw new GridException("Failed to parse SQL query: " + sql, e); + } + + switch (commandType(stmt)) { + case CommandInterface.SELECT: + case CommandInterface.CALL: + case CommandInterface.EXPLAIN: + case CommandInterface.ANALYZE: + break; + default: + throw new GridException("Failed to execute non-query SQL statement: " + sql); + } + + bindParameters(stmt, params); + + try { + return stmt.executeQuery(); + } + catch (SQLException e) { + throw new GridException("Failed to execute SQL query.", e); + } + } + + /** + * Executes sql query and prints warning if query is too slow.. + * + * @param conn Connection,. + * @param sql Sql query. + * @param params Parameters. + * @return Result. + * @throws GridException If failed. + */ + private ResultSet executeSqlQueryWithTimer(Connection conn, String sql, + @Nullable Collection<Object> params) throws GridException { + long start = U.currentTimeMillis(); + + try { + ResultSet rs = executeSqlQuery(conn, sql, params); + + long time = U.currentTimeMillis() - start; + + long longQryExecTimeout = cfg.getLongQueryExecutionTimeout(); + + if (time > longQryExecTimeout) { + String msg = "Query execution is too long (" + time + " ms): " + sql; + + String longMsg = msg; + + if (cfg.isLongQueryExplain()) { + ResultSet plan = executeSqlQuery(conn, "EXPLAIN " + sql, params); + + if (plan == null) + longMsg = "Failed to explain plan because required table does not exist: " + sql; + else { + plan.next(); + + // Add SQL explain result message into log. + longMsg = "Query execution is too long [time=" + time + " ms, sql='" + sql + '\'' + + ", plan=" + U.nl() + plan.getString(1) + U.nl() + ", parameters=" + params + "]"; + } + } + + LT.warn(log, null, longMsg, msg); + } + + return rs; + } + catch (SQLException e) { + onSqlException(); + + throw new GridException(e); + } + } + + /** + * Executes query. + * + * @param qry Query. + * @param params Query parameters. + * @param tbl Target table of query to generate select. + * @return Result set. + * @throws GridException If failed. + */ + private ResultSet executeQuery(String qry, @Nullable Collection<Object> params, + @Nullable TableDescriptor tbl) throws GridException { + Connection conn = connectionForThread(tbl != null ? tbl.schema() : "PUBLIC"); + + String sql = generateQuery(qry, tbl); + + return executeSqlQueryWithTimer(conn, sql, params); + } + + /** + * Binds parameters to prepared statement. + * + * @param stmt Prepared statement. + * @param params Parameters collection. + * @throws GridException If failed. + */ + private void bindParameters(PreparedStatement stmt, @Nullable Collection<Object> params) throws GridException { + if (!F.isEmpty(params)) { + int idx = 1; + + for (Object arg : params) + bindObject(stmt, idx++, arg); + } + } + + /** + * Executes regular query. + * Note that SQL query can not refer to table alias, so use full table name instead. + * + * @param spaceName Space name. + * @param qry Query. + * @param params Query parameters. + * @param type Query return type. + * @param filters Space name and key filters. + * @return Queried rows. + * @throws GridException If failed. + */ + @SuppressWarnings("unchecked") + @Override public <K, V> GridCloseableIterator<IgniteBiTuple<K, V>> query(@Nullable String spaceName, + final String qry, @Nullable final Collection<Object> params, GridQueryTypeDescriptor type, + final GridIndexingQueryFilter filters) throws GridException { + final TableDescriptor tbl = tableDescriptor(spaceName, type); + + if (tbl == null) + return new GridEmptyCloseableIterator<>(); + + setFilters(filters); + + localSpi.set(this); + + try { + ResultSet rs = executeQuery(qry, params, tbl); + + return new KeyValIterator(rs); + } + finally { + setFilters(null); + + localSpi.remove(); + } + } + + /** + * Sets filters for current thread. Must be set to not null value + * before executeQuery and reset to null after in finally block since it signals + * to table that it should return content without expired values. + * + * @param filters Filters. + */ + private void setFilters(@Nullable GridIndexingQueryFilter filters) { + GridH2IndexBase.setFiltersForThread(filters); + } + + /** + * Prepares statement for query. + * + * @param qry Query string. + * @param tbl Table to use. + * @return Prepared statement. + * @throws GridException In case of error. + */ + private String generateQuery(String qry, @Nullable TableDescriptor tbl) throws GridException { + boolean needSelect = tbl != null; + + String str = qry.trim().toUpperCase(); + + if (!str.startsWith("FROM")) { + if (str.startsWith("SELECT")) { + if (needSelect) { + StringTokenizer st = new StringTokenizer(str, " "); + + String errMsg = "Wrong query format, query must start with 'select * from' " + + "or 'from' or without such keywords."; + + if (st.countTokens() > 3) { + st.nextToken(); + String wildcard = st.nextToken(); + String from = st.nextToken(); + + if (!"*".equals(wildcard) || !"FROM".equals(from)) + throw new GridException(errMsg); + + needSelect = false; + } + else + throw new GridException(errMsg); + } + } + else { + boolean needWhere = !str.startsWith("ORDER") && !str.startsWith("LIMIT"); + + qry = needWhere ? "FROM " + tbl.fullTableName() + " WHERE " + qry : + "FROM " + tbl.fullTableName() + ' ' + qry; + } + } + + GridStringBuilder ptrn = new SB("SELECT {0}.").a(KEY_FIELD_NAME); + + ptrn.a(", {0}.").a(VAL_FIELD_NAME); + + return needSelect ? MessageFormat.format(ptrn.toString(), tbl.fullTableName()) + ' ' + qry : qry; + } + + /** + * Registers new class description. + * + * This implementation doesn't support type reregistration. + * + * @param type Type description. + * @throws GridException In case of error. + */ + @Override public boolean registerType(@Nullable String spaceName, GridQueryTypeDescriptor type) + throws GridException { + if (!validateTypeDescriptor(spaceName, type)) + return false; + + for (TableDescriptor table : tables(schema(spaceName))) + // Need to compare class names rather than classes to define + // whether a class was previously undeployed. + if (table.type().valueClass().getClass().getName().equals(type.valueClass().getName())) + throw new GridException("Failed to register type in query index because" + + " class is already registered (most likely that class with the same name" + + " was not properly undeployed): " + type); + + TableDescriptor tbl = new TableDescriptor(spaceName, type); + + try { + Connection conn = connectionForThread(null); + + Schema schema = schemas.get(tbl.schema()); + + if (schema == null) { + schema = new Schema(spaceName); + + Schema existing = schemas.putIfAbsent(tbl.schema(), schema); + + if (existing != null) + schema = existing; + } + + createTable(schema, tbl, conn); + + schema.put(tbl.name(), tbl); + } + catch (SQLException e) { + onSqlException(); + + throw new GridException("Failed to register query type: " + type, e); + } + + return true; + } + + /** + * @param cls Class. + * @return True if given class has primitive respective sql type. + */ + private boolean isPrimitive(Class<?> cls) { + DBTypeEnum valType = DBTypeEnum.fromClass(cls); + + return valType != DBTypeEnum.BINARY && valType != DBTypeEnum.OTHER && + valType != DBTypeEnum.ARRAY; + } + + /** + * Validates properties described by query types. + * + * @param spaceName Space name. + * @param type Type descriptor. + * @return True if type is valid. + * @throws GridException If validation failed. + */ + private boolean validateTypeDescriptor(@Nullable String spaceName, GridQueryTypeDescriptor type) + throws GridException { + assert type != null; + + boolean keyPrimitive = isPrimitive(type.keyClass()); + boolean valPrimitive = isPrimitive(type.valueClass()); + + // Do not register if value is not primitive and + // there are no indexes or fields defined. + if (!type.valueTextIndex() && type.indexes().isEmpty() && + type.keyFields().isEmpty() && type.valueFields().isEmpty()) + return keyPrimitive && isIndexPrimitiveKey(spaceName) || valPrimitive && isIndexPrimitiveValue(spaceName); + + Collection<String> names = new HashSet<>(); + + names.addAll(type.keyFields().keySet()); + names.addAll(type.valueFields().keySet()); + + if (names.size() < type.keyFields().size() + type.valueFields().size()) + throw new GridException("Found duplicated properties with the same name [keyType=" + + type.keyClass().getName() + ", valueType=" + type.valueClass().getName() + "]"); + + String ptrn = "Name ''{0}'' is reserved and cannot be used as a field name [class=" + type + "]"; + + for (String name : names) { + if (name.equals(KEY_FIELD_NAME) || name.equals(VAL_FIELD_NAME)) + throw new GridException(MessageFormat.format(ptrn, name)); + } + + return true; + } + + /** + * Escapes name to be valid SQL identifier. Currently just replaces '.' and '$' sign with '_'. + * + * @param name Name. + * @param escapeAll Escape flag. + * @return Escaped name. + */ + private static String escapeName(String name, boolean escapeAll) { + if (escapeAll) + return "\"" + name + "\""; + + SB sb = null; + + for (int i = 0; i < name.length(); i++) { + char ch = name.charAt(i); + + if (!Character.isLetter(ch) && !Character.isDigit(ch) && ch != '_' && + !(ch == '"' && (i == 0 || i == name.length() - 1)) && ch != '-') { + // Class name can also contain '$' or '.' - these should be escaped. + assert ch == '$' || ch == '.'; + + if (sb == null) + sb = new SB(); + + sb.a(name.substring(sb.length(), i)); + + // Replace illegal chars with '_'. + sb.a('_'); + } + } + + if (sb == null) + return name; + + sb.a(name.substring(sb.length(), name.length())); + + return sb.toString(); + } + + /** + * Create db table by using given table descriptor. + * + * @param schema Schema. + * @param tbl Table descriptor. + * @param conn Connection. + * @throws SQLException If failed to create db table. + */ + private void createTable(Schema schema, TableDescriptor tbl, Connection conn) throws SQLException { + assert tbl != null; + + boolean keyAsObj = !isIndexFixedTyping(schema.spaceName); + + boolean escapeAll = isEscapeAll(schema.spaceName); + + String keyType = keyAsObj ? "OTHER" : dbTypeFromClass(tbl.type().keyClass()); + String valTypeStr = dbTypeFromClass(tbl.type().valueClass()); + + SB sql = new SB(); + + sql.a("CREATE TABLE ").a(tbl.fullTableName()).a(" (") + .a(KEY_FIELD_NAME).a(' ').a(keyType).a(" NOT NULL"); + + sql.a(',').a(VAL_FIELD_NAME).a(' ').a(valTypeStr); + + for (Map.Entry<String, Class<?>> e: tbl.type().keyFields().entrySet()) + sql.a(',').a(escapeName(e.getKey(), escapeAll)).a(' ').a(dbTypeFromClass(e.getValue())); + + for (Map.Entry<String, Class<?>> e: tbl.type().valueFields().entrySet()) + sql.a(',').a(escapeName(e.getKey(), escapeAll)).a(' ').a(dbTypeFromClass(e.getValue())); + + sql.a(')'); + + if (log.isDebugEnabled()) + log.debug("Creating DB table with SQL: " + sql); + + GridH2RowDescriptor desc = new RowDescriptor(tbl.type(), schema, keyAsObj); + + GridH2Table.Engine.createTable(conn, sql.toString(), desc, tbl, tbl.spaceName); + } + + /** + * Gets corresponding DB type from java class. + * + * @param cls Java class. + * @return DB type name. + */ + private String dbTypeFromClass(Class<?> cls) { + return DBTypeEnum.fromClass(cls).dBTypeAsString(); + } + + /** + * Gets table descriptor by value type. + * + * @param spaceName Space name. + * @param type Value type descriptor. + * @return Table descriptor or {@code null} if not found. + */ + @Nullable private TableDescriptor tableDescriptor(@Nullable String spaceName, GridQueryTypeDescriptor type) { + return tableDescriptor(type.name(), spaceName); + } + + /** + * Gets table descriptor by type and space names. + * + * @param type Type name. + * @param space Space name. + * @return Table descriptor. + */ + @Nullable private TableDescriptor tableDescriptor(String type, @Nullable String space) { + ConcurrentMap<String, TableDescriptor> tbls = schemas.get(schema(space)); + + if (tbls == null) + return null; + + return tbls.get(type); + } + + /** + * Gets collection of table for given schema name. + * + * @param schema Schema name. + * @return Collection of table descriptors. + */ + private Collection<TableDescriptor> tables(String schema) { + ConcurrentMap<String, TableDescriptor> tbls = schemas.get(schema); + + if (tbls == null) + return Collections.emptySet(); + + return tbls.values(); + } + + /** + * Gets database schema from space. + * + * @param space Space name. + * @return Schema name. + */ + private static String schema(@Nullable String space) { + if (F.isEmpty(space)) + return "PUBLIC"; + + return space; + } + + /** {@inheritDoc} */ + @Override public void rebuildIndexes(@Nullable String spaceName, GridQueryTypeDescriptor type) { + if (offheap != null) + throw new UnsupportedOperationException("Index rebuilding is not supported when off-heap memory is used"); + + TableDescriptor tbl = tableDescriptor(spaceName, type); + + if (tbl == null) + return; + + tbl.tbl.rebuildIndexes(); + } + + /** {@inheritDoc} */ + @Override public long size(@Nullable String spaceName, GridQueryTypeDescriptor type, + GridIndexingQueryFilter filters) throws GridException { + TableDescriptor tbl = tableDescriptor(spaceName, type); + + if (tbl == null) + return -1; + + IgniteSpiCloseableIterator<List<?>> iter = queryFields(spaceName, + "SELECT COUNT(*) FROM " + tbl.fullTableName(), null, null).iterator(); + + return ((Number)iter.next().get(0)).longValue(); + } + + /** {@inheritDoc} */ + @SuppressWarnings("NonThreadSafeLazyInitialization") + @Override public void start(GridKernalContext ctx) throws GridException { + if (log.isDebugEnabled()) + log.debug("Starting cache query index..."); + + if (ctx != null) { // This is allowed in some tests. + this.ctx = ctx; + + GridQueryConfiguration cfg0 = ctx.config().getQueryConfiguration(); + + if (cfg0 != null) + cfg = cfg0; + + for (GridCacheConfiguration cacheCfg : ctx.config().getCacheConfiguration()) + registerSpace(cacheCfg.getName()); + } + + System.setProperty("h2.serializeJavaObject", "false"); + + if (SysProperties.serializeJavaObject) { + U.warn(log, "Serialization of Java objects in H2 was enabled."); + + SysProperties.serializeJavaObject = false; + } + + if (cfg.isUseOptimizedSerializer()) + Utils.serializer = h2Serializer(); + + long maxOffHeapMemory = cfg.getMaxOffHeapMemory(); + + if (maxOffHeapMemory != -1) { + assert maxOffHeapMemory >= 0 : maxOffHeapMemory; + + offheap = new GridUnsafeMemory(maxOffHeapMemory); + } + + SB opt = new SB(); + + opt.a(DFLT_DB_OPTIONS).a(OPTIMIZED_DB_OPTIONS); + + String dbName = UUID.randomUUID().toString(); + + dbUrl = "jdbc:h2:mem:" + dbName + opt; + + try { + Class.forName("org.h2.Driver"); + } + catch (ClassNotFoundException e) { + throw new GridException("Failed to find org.h2.Driver class", e); + } + + for (String schema : schemaNames) + createSchemaIfAbsent(schema); + + try { + createSqlFunctions(); + runInitScript(); + + if (getString(GG_H2_DEBUG_CONSOLE) != null) { + Connection c = DriverManager.getConnection(dbUrl); + + WebServer webSrv = new WebServer(); + Server web = new Server(webSrv, "-webPort", "0"); + web.start(); + String url = webSrv.addSession(c); + + try { + Server.openBrowser(url); + } + catch (Exception e) { + U.warn(log, "Failed to open browser: " + e.getMessage()); + } + } + } + catch (SQLException e) { + throw new GridException(e); + } + +// registerMBean(gridName, this, GridH2IndexingSpiMBean.class); TODO + } + + /** + * @return Serializer. + */ + protected JavaObjectSerializer h2Serializer() { + return new JavaObjectSerializer() { + @Override public byte[] serialize(Object obj) throws Exception { + return marshaller.marshal(obj); + } + + @Override public Object deserialize(byte[] bytes) throws Exception { + return marshaller.unmarshal(bytes, null); + } + }; + } + + /** + * Runs initial script. + * + * @throws GridException If failed. + * @throws SQLException If failed. + */ + private void runInitScript() throws GridException, SQLException { + String initScriptPath = cfg.getInitialScriptPath(); + + if (initScriptPath == null) + return; + + try (PreparedStatement p = connectionForThread(null).prepareStatement("RUNSCRIPT FROM ? CHARSET 'UTF-8'")) { + p.setString(1, initScriptPath); + + p.execute(); + } + } + + /** + * Registers SQL functions. + * + * @throws SQLException If failed. + * @throws GridException If failed. + */ + private void createSqlFunctions() throws SQLException, GridException { + Class<?>[] idxCustomFuncClss = cfg.getIndexCustomFunctionClasses(); + + if (F.isEmpty(idxCustomFuncClss)) + return; + + for (Class<?> cls : idxCustomFuncClss) { + for (Method m : cls.getDeclaredMethods()) { + GridCacheQuerySqlFunction ann = m.getAnnotation(GridCacheQuerySqlFunction.class); + + if (ann != null) { + int modifiers = m.getModifiers(); + + if (!Modifier.isStatic(modifiers) || !Modifier.isPublic(modifiers)) + throw new GridException("Method " + m.getName() + " must be public static."); + + String alias = ann.alias().isEmpty() ? m.getName() : ann.alias(); + + String clause = "CREATE ALIAS " + alias + (ann.deterministic() ? " DETERMINISTIC FOR \"" : + " FOR \"") + cls.getName() + '.' + m.getName() + '"'; + + Collection<String> schemas = new ArrayList<>(schemaNames); + + if (!schemaNames.contains(schema(null))) + schemas.add(schema(null)); + + for (String schema : schemas) { + Connection c = connectionForThread(schema); + + Statement s = c.createStatement(); + + s.execute(clause); + + s.close(); + } + } + } + } + } + + /** {@inheritDoc} */ + @Override public void stop() throws GridException { + if (log.isDebugEnabled()) + log.debug("Stopping cache query index..."); + +// unregisterMBean(); TODO + + Connection conn = connectionForThread(null); + + for (ConcurrentMap<String, TableDescriptor> m : schemas.values()) { + for (TableDescriptor desc : m.values()) { + desc.tbl.close(); + + if (desc.luceneIdx != null) + U.closeQuiet(desc.luceneIdx); + } + } + + if (conn != null) { + Statement stmt = null; + + try { + stmt = conn.createStatement(); + + stmt.execute("DROP ALL OBJECTS DELETE FILES"); + stmt.execute("SHUTDOWN"); + } + catch (SQLException e) { + throw new GridException("Failed to shutdown database.", e); + } + finally { + U.close(stmt, log); + } + } + + for (Connection c : conns) + U.close(c, log); + + conns.clear(); + schemas.clear(); + rowCache.clear(); + + if (log.isDebugEnabled()) + log.debug("Cache query index stopped."); + } + + /** + * @param spaceName Space name. + * @return {@code true} If primitive keys must be indexed. + */ + public boolean isIndexPrimitiveKey(@Nullable String spaceName) { + GridCacheQueryConfiguration cfg = cacheQueryConfiguration(spaceName); + + return cfg != null && cfg.isIndexPrimitiveKey(); + } + + /** + * @param spaceName Space name. + * @return {@code true} If primitive values must be indexed. + */ + public boolean isIndexPrimitiveValue(String spaceName) { + GridCacheQueryConfiguration cfg = cacheQueryConfiguration(spaceName); + + return cfg != null && cfg.isIndexPrimitiveValue(); + } + + /** {@inheritDoc} */ + public boolean isIndexFixedTyping(String spaceName) { + GridCacheQueryConfiguration cfg = cacheQueryConfiguration(spaceName); + + return cfg != null && cfg.isIndexFixedTyping(); + } + + /** {@inheritDoc} */ + public boolean isEscapeAll(String spaceName) { + GridCacheQueryConfiguration cfg = cacheQueryConfiguration(spaceName); + + return cfg != null && cfg.isEscapeAll(); + } + + /** + * @param spaceName Space name. + * @return Cache query configuration. + */ + @Nullable private GridCacheQueryConfiguration cacheQueryConfiguration(String spaceName) { + return ctx == null ? null : ctx.cache().internalCache(spaceName).configuration().getQueryConfiguration(); + } + + /** {@inheritDoc} */ + public int getMaxOffheapRowsCacheSize() { + return (int)rowCache.getMaxMemory(); + } + + /** {@inheritDoc} */ + public int getOffheapRowsCacheSize() { + return (int)rowCache.getUsedMemory(); + } + + /** {@inheritDoc} */ + public long getAllocatedOffHeapMemory() { + return offheap == null ? -1 : offheap.allocatedSize(); + } + + /** + * @param spaceName Space name. + */ + public void registerSpace(String spaceName) { + schemaNames.add(schema(spaceName)); + } + + /** + * Wrapper to store connection and flag is schema set or not. + */ + private static class ConnectionWrapper { + /** */ + private Connection conn; + + /** */ + private volatile String schema; + + /** + * @param conn Connection to use. + */ + ConnectionWrapper(Connection conn) { + this.conn = conn; + } + + /** + * @return Schema name if schema is set, null otherwise. + */ + public String schema() { + return schema; + } + + /** + * @param schema Schema name set on this connection. + */ + public void schema(@Nullable String schema) { + this.schema = schema; + } + + /** + * @return Connection. + */ + public Connection connection() { + return conn; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(ConnectionWrapper.class, this); + } + } + + /** + * Enum that helps to map java types to database types. + */ + private enum DBTypeEnum { + /** */ + INT("INT"), + + /** */ + BOOL("BOOL"), + + /** */ + TINYINT("TINYINT"), + + /** */ + SMALLINT("SMALLINT"), + + /** */ + BIGINT("BIGINT"), + + /** */ + DECIMAL("DECIMAL"), + + /** */ + DOUBLE("DOUBLE"), + + /** */ + REAL("REAL"), + + /** */ + TIME("TIME"), + + /** */ + TIMESTAMP("TIMESTAMP"), + + /** */ + DATE("DATE"), + + /** */ + VARCHAR("VARCHAR"), + + /** */ + CHAR("CHAR"), + + /** */ + BINARY("BINARY"), + + /** */ + UUID("UUID"), + + /** */ + ARRAY("ARRAY"), + + /** */ + GEOMETRY("GEOMETRY"), + + /** */ + OTHER("OTHER"); + + /** Map of Class to enum. */ + private static final Map<Class<?>, DBTypeEnum> map = new HashMap<>(); + + /** + * Initialize map of DB types. + */ + static { + map.put(int.class, INT); + map.put(Integer.class, INT); + map.put(boolean.class, BOOL); + map.put(Boolean.class, BOOL); + map.put(byte.class, TINYINT); + map.put(Byte.class, TINYINT); + map.put(short.class, SMALLINT); + map.put(Short.class, SMALLINT); + map.put(long.class, BIGINT); + map.put(Long.class, BIGINT); + map.put(BigDecimal.class, DECIMAL); + map.put(double.class, DOUBLE); + map.put(Double.class, DOUBLE); + map.put(float.class, REAL); + map.put(Float.class, REAL); + map.put(Time.class, TIME); + map.put(Timestamp.class, TIMESTAMP); + map.put(java.util.Date.class, TIMESTAMP); + map.put(java.sql.Date.class, DATE); + map.put(char.class, CHAR); + map.put(Character.class, CHAR); + map.put(String.class, VARCHAR); + map.put(UUID.class, UUID); + map.put(byte[].class, BINARY); + } + + /** */ + private final String dbType; + + /** + * Constructs new instance. + * + * @param dbType DB type name. + */ + DBTypeEnum(String dbType) { + this.dbType = dbType; + } + + /** + * Resolves enum by class. + * + * @param cls Class. + * @return Enum value. + */ + public static DBTypeEnum fromClass(Class<?> cls) { + DBTypeEnum res = map.get(cls); + + if (res != null) + return res; + + if (DataType.isGeometryClass(cls)) + return GEOMETRY; + + return cls.isArray() && !cls.getComponentType().isPrimitive() ? ARRAY : OTHER; + } + + /** + * Gets DB type name. + * + * @return DB type name. + */ + public String dBTypeAsString() { + return dbType; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(DBTypeEnum.class, this); + } + } + + /** + * Information about table in database. + */ + private class TableDescriptor implements GridH2Table.IndexesFactory { + /** */ + private final String fullTblName; + + /** */ + private final GridQueryTypeDescriptor type; + + /** */ + private final String spaceName; + + /** */ + private final String schema; + + /** */ + private GridH2Table tbl; + + /** */ + private GridLuceneIndex luceneIdx; + + /** + * @param spaceName Space name. + * @param type Type descriptor. + */ + TableDescriptor(@Nullable String spaceName, GridQueryTypeDescriptor type) { + this.spaceName = spaceName; + this.type = type; + + schema = GridH2Indexing.schema(spaceName); + + fullTblName = '\"' + schema + "\"." + escapeName(type.name(), isEscapeAll(spaceName)); + } + + /** + * @return Schema name. + */ + public String schema() { + return schema; + } + + /** + * @return Database table name. + */ + String fullTableName() { + return fullTblName; + } + + /** + * @return Database table name. + */ + String name() { + return type.name(); + } + + /** + * @return Type. + */ + GridQueryTypeDescriptor type() { + return type; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(TableDescriptor.class, this); + } + + /** {@inheritDoc} */ + @Override public ArrayList<Index> createIndexes(GridH2Table tbl) { + this.tbl = tbl; + + ArrayList<Index> idxs = new ArrayList<>(); + + idxs.add(new GridH2TreeIndex("_key_PK", tbl, true, KEY_COL, VAL_COL, tbl.indexColumn(0, ASCENDING))); + + if (type().valueClass() == String.class) { + try { + luceneIdx = new GridLuceneIndex(marshaller, offheap, spaceName, type, true); + } + catch (GridException e1) { + throw new GridRuntimeException(e1); + } + } + + for (Map.Entry<String, GridQueryIndexDescriptor> e : type.indexes().entrySet()) { + String name = e.getKey(); + GridQueryIndexDescriptor idx = e.getValue(); + + if (idx.type() == FULLTEXT) { + try { + luceneIdx = new GridLuceneIndex(marshaller, offheap, spaceName, type, true); + } + catch (GridException e1) { + throw new GridRuntimeException(e1); + } + } + else { + IndexColumn[] cols = new IndexColumn[idx.fields().size()]; + + int i = 0; + + boolean escapeAll = isEscapeAll(spaceName); + + for (String field : idx.fields()) { + // H2 reserved keywords used as column name is case sensitive. + String fieldName = escapeAll ? field : escapeName(field, escapeAll).toUpperCase(); + + Column col = tbl.getColumn(fieldName); + + cols[i++] = tbl.indexColumn(col.getColumnId(), idx.descending(field) ? DESCENDING : ASCENDING); + } + + if (idx.type() == SORTED) + idxs.add(new GridH2TreeIndex(name, tbl, false, KEY_COL, VAL_COL, cols)); + else if (idx.type() == GEO_SPATIAL) + idxs.add(new GridH2SpatialIndex(tbl, name, cols, KEY_COL, VAL_COL)); + else + throw new IllegalStateException(); + } + } + + return idxs; + } + } + + /** + * Special field set iterator based on database result set. + */ + private static class FieldsIterator extends GridH2ResultSetIterator<List<?>> { + /** */ + private static final long serialVersionUID = 0L; + + /** + * @param data Data. + * @throws GridException If failed. + */ + protected FieldsIterator(ResultSet data) throws GridException { + super(data); + } + + /** {@inheritDoc} */ + @Override protected List<?> createRow() { + ArrayList<Object> res = new ArrayList<>(row.length); + + Collections.addAll(res, row); + + return res; + } + } + + /** + * Special key/value iterator based on database result set. + */ + private static class KeyValIterator<K, V> extends GridH2ResultSetIterator<IgniteBiTuple<K, V>> { + /** */ + private static final long serialVersionUID = 0L; + + /** + * @param data Data array. + * @throws GridException If failed. + */ + protected KeyValIterator(ResultSet data) throws GridException { + super(data); + } + + /** {@inheritDoc} */ + @Override protected IgniteBiTuple<K, V> createRow() { + K key = (K)row[0]; + V val = (V)row[1]; + + return new IgniteBiTuple<>(key, val); + } + } + + /** + * Field descriptor. + */ + private static class SqlFieldMetadata implements GridQueryFieldMetadata { + /** */ + private static final long serialVersionUID = 0L; + + /** Schema name. */ + private String schemaName; + + /** Type name. */ + private String typeName; + + /** Name. */ + private String name; + + /** Type. */ + private String type; + + /** + * Required by {@link Externalizable}. + */ + public SqlFieldMetadata() { + // No-op + } + + /** + * @param schemaName Schema name. + * @param typeName Type name. + * @param name Name. + * @param type Type. + */ + SqlFieldMetadata(@Nullable String schemaName, @Nullable String typeName, String name, String type) { + assert name != null; + assert type != null; + + this.schemaName = schemaName; + this.typeName = typeName; + this.name = name; + this.type = type; + } + + /** {@inheritDoc} */ + @Override public String schemaName() { + return schemaName; + } + + /** {@inheritDoc} */ + @Override public String typeName() { + return typeName; + } + + /** {@inheritDoc} */ + @Override public String fieldName() { + return name; + } + + /** {@inheritDoc} */ + @Override public String fieldTypeName() { + return type; + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + U.writeString(out, schemaName); + U.writeString(out, typeName); + U.writeString(out, name); + U.writeString(out, type); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + schemaName = U.readString(in); + typeName = U.readString(in); + name = U.readString(in); + type = U.readString(in); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(SqlFieldMetadata.class, this); + } + } + + /** + * Database schema object. + */ + private static class Schema extends ConcurrentHashMap8<String, TableDescriptor> { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private final String spaceName; + + /** + * @param spaceName Space name. + */ + private Schema(@Nullable String spaceName) { + this.spaceName = spaceName; + } + } + + /** + * Row descriptor. + */ + private class RowDescriptor implements GridH2RowDescriptor { + /** */ + private final GridQueryTypeDescriptor type; + + /** */ + private final String[] fields; + + /** */ + private final int[] fieldTypes; + + /** */ + private final int keyType; + + /** */ + private final int valType; + + /** */ + private final Schema schema; + + /** */ + private final int keyCols; + + /** */ + private final GridUnsafeGuard guard = offheap == null ? null : new GridUnsafeGuard(); + + /** + * @param type Type descriptor. + * @param schema Schema. + * @param keyAsObj Store key as java object. + */ + RowDescriptor(GridQueryTypeDescriptor type, Schema schema, boolean keyAsObj) { + assert type != null; + assert schema != null; + + this.type = type; + this.schema = schema; + + keyCols = type.keyFields().size(); + + Map<String, Class<?>> allFields = new LinkedHashMap<>(); + + allFields.putAll(type.keyFields()); + allFields.putAll(type.valueFields()); + + fields = allFields.keySet().toArray(new String[allFields.size()]); + + fieldTypes = new int[fields.length]; + + Class[] classes = allFields.values().toArray(new Class[fields.length]); + + for (int i = 0; i < fieldTypes.length; i++) + fieldTypes[i] = DataType.getTypeFromClass(classes[i]); + + keyType = keyAsObj ? Value.JAVA_OBJECT : DataType.getTypeFromClass(type.keyClass()); + valType = DataType.getTypeFromClass(type.valueClass()); + } + + /** {@inheritDoc} */ + @Override public GridUnsafeGuard guard() { + return guard; + } + + /** {@inheritDoc} */ + @Override public void cache(GridH2KeyValueRowOffheap row) { + long ptr = row.pointer(); + + assert ptr > 0 : ptr; + + rowCache.put(ptr, row); + } + + /** {@inheritDoc} */ + @Override public void uncache(long ptr) { + rowCache.remove(ptr); + } + + /** {@inheritDoc} */ + @Override public GridUnsafeMemory memory() { + return offheap; + } + + /** {@inheritDoc} */ + @Override public GridH2Indexing owner() { + return GridH2Indexing.this; + } + + /** {@inheritDoc} */ + @Override public GridH2AbstractKeyValueRow createRow(Object key, @Nullable Object val, long expirationTime) + throws GridException { + try { + return offheap == null ? + new GridH2KeyValueRowOnheap(this, key, keyType, val, valType, expirationTime) : + new GridH2KeyValueRowOffheap(this, key, keyType, val, valType, expirationTime); + } + catch (ClassCastException e) { + throw new GridException("Failed to convert key to SQL type. " + + "Please make sure that you always store each value type with the same key type or disable " + + "'defaultIndexFixedTyping' property on GridH2IndexingSpi.", e); + } + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public Object readFromSwap(Object key) throws GridException { + GridCache<Object, ?> cache = ctx.cache().cache(schema.spaceName); + + GridCacheContext cctx = ((GridCacheProxyImpl)cache).context(); + + if (cctx.isNear()) + cctx = cctx.near().dht().context(); + + GridCacheSwapEntry e = cctx.swap().read(key); + + return e != null ? e.value() : null; + } + + /** {@inheritDoc} */ + @Override public int valueType() { + return valType; + } + + /** {@inheritDoc} */ + @Override public int fieldsCount() { + return fields.length; + } + + /** {@inheritDoc} */ + @Override public int fieldType(int col) { + return fieldTypes[col]; + } + + /** {@inheritDoc} */ + @Override public Object columnValue(Object obj, int col) { + try { + return type.value(obj, fields[col]); + } + catch (GridException e) { + throw DbException.convert(e); + } + } + + /** {@inheritDoc} */ + @Override public boolean isKeyColumn(int col) { + return keyCols > col; + } + + /** {@inheritDoc} */ + @Override public boolean valueToString() { + return type.valueTextIndex(); + } + + /** {@inheritDoc} */ + @Override public GridH2KeyValueRowOffheap createPointer(long ptr) { + GridH2KeyValueRowOffheap row = rowCache.get(ptr); + + if (row != null) { + assert row.pointer() == ptr : ptr + " " + row.pointer(); + + return row; + } + + return new GridH2KeyValueRowOffheap(this, ptr); + } + } +}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/922a2bab/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/GridH2ResultSetIterator.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/GridH2ResultSetIterator.java b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/GridH2ResultSetIterator.java new file mode 100644 index 0000000..a231144 --- /dev/null +++ b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/GridH2ResultSetIterator.java @@ -0,0 +1,122 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.gridgain.grid.kernal.processors.query.h2; + +import org.gridgain.grid.*; +import org.gridgain.grid.util.*; +import org.gridgain.grid.util.typedef.internal.*; + +import java.sql.*; +import java.util.*; + + +/** + * Iterator over result set. + */ +abstract class GridH2ResultSetIterator<T> extends GridCloseableIteratorAdapter<T> { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private final ResultSet data; + + /** */ + protected final Object[] row; + + /** */ + private boolean hasRow; + + /** + * @param data Data array. + * @throws GridException If failed. + */ + protected GridH2ResultSetIterator(ResultSet data) throws GridException { + this.data = data; + + if (data != null) { + try { + row = new Object[data.getMetaData().getColumnCount()]; + } + catch (SQLException e) { + throw new GridException(e); + } + } + else + row = null; + } + + /** + * @return {@code true} If next row was fetched successfully. + */ + private boolean fetchNext() { + if (data == null) + return false; + + try { + if (!data.next()) + return false; + + for (int c = 0; c < row.length; c++) + row[c] = data.getObject(c + 1); + + return true; + } + catch (SQLException e) { + throw new GridRuntimeException(e); + } + } + + /** {@inheritDoc} */ + @Override public boolean onHasNext() { + return hasRow || (hasRow = fetchNext()); + } + + /** {@inheritDoc} */ + @SuppressWarnings("IteratorNextCanNotThrowNoSuchElementException") + @Override public T onNext() { + if (!hasNext()) + throw new NoSuchElementException(); + + hasRow = false; + + return createRow(); + } + + /** + * @return Row. + */ + protected abstract T createRow(); + + /** {@inheritDoc} */ + @Override public void onRemove() { + throw new UnsupportedOperationException(); + } + + /** {@inheritDoc} */ + @Override public void onClose() throws GridException { + if (data == null) + // Nothing to close. + return; + + try { + U.closeQuiet(data.getStatement()); + } + catch (SQLException e) { + throw new GridException(e); + } + + U.closeQuiet(data); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString((Class<GridH2ResultSetIterator>)getClass(), this); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/922a2bab/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/opt/GridH2AbstractKeyValueRow.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/opt/GridH2AbstractKeyValueRow.java b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/opt/GridH2AbstractKeyValueRow.java new file mode 100644 index 0000000..ee96666 --- /dev/null +++ b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/opt/GridH2AbstractKeyValueRow.java @@ -0,0 +1,447 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.gridgain.grid.kernal.processors.query.h2.opt; + +import org.apache.ignite.spi.*; +import org.gridgain.grid.*; +import org.gridgain.grid.kernal.processors.query.*; +import org.gridgain.grid.util.typedef.internal.*; +import org.h2.message.*; +import org.h2.result.*; +import org.h2.value.*; +import org.jetbrains.annotations.*; + +import java.lang.ref.*; +import java.math.*; +import java.sql.Date; +import java.sql.*; +import java.util.*; + +/** + * Table row implementation based on {@link GridQueryTypeDescriptor}. + */ +public abstract class GridH2AbstractKeyValueRow extends GridH2Row { + /** */ + private static final int DEFAULT_COLUMNS_COUNT = 2; + + /** Key column. */ + public static final int KEY_COL = 0; + + /** Value column. */ + public static final int VAL_COL = 1; + + /** */ + protected final GridH2RowDescriptor desc; + + /** */ + @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized") + protected long expirationTime; + + /** + * Constructor. + * + * @param desc Row descriptor. + * @param key Key. + * @param keyType Key type. + * @param val Value. + * @param valType Value type. + * @param expirationTime Expiration time. + * @throws IgniteSpiException If failed. + */ + protected GridH2AbstractKeyValueRow(GridH2RowDescriptor desc, Object key, int keyType, @Nullable Object val, + int valType, long expirationTime) throws IgniteSpiException { + super(wrap(key, keyType), + val == null ? null : wrap(val, valType)); // We remove by key only, so value can be null here. + + this.desc = desc; + this.expirationTime = expirationTime; + } + + /** + * Protected constructor for {@link GridH2KeyValueRowOffheap} + * + * @param desc Row descriptor. + */ + protected GridH2AbstractKeyValueRow(GridH2RowDescriptor desc) { + super(new Value[DEFAULT_COLUMNS_COUNT]); + + this.desc = desc; + } + + /** + * Wraps object to respective {@link Value}. + * + * @param obj Object. + * @param type Value type. + * @return Value. + * @throws IgniteSpiException If failed. + */ + private static Value wrap(Object obj, int type) throws IgniteSpiException { + switch (type) { + case Value.BOOLEAN: + return ValueBoolean.get((Boolean)obj); + case Value.BYTE: + return ValueByte.get((Byte)obj); + case Value.SHORT: + return ValueShort.get((Short)obj); + case Value.INT: + return ValueInt.get((Integer)obj); + case Value.FLOAT: + return ValueFloat.get((Float)obj); + case Value.LONG: + return ValueLong.get((Long)obj); + case Value.DOUBLE: + return ValueDouble.get((Double)obj); + case Value.UUID: + UUID uuid = (UUID)obj; + return ValueUuid.get(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits()); + case Value.DATE: + return ValueDate.get((Date)obj); + case Value.TIME: + return ValueTime.get((Time)obj); + case Value.TIMESTAMP: + if (obj instanceof java.util.Date && !(obj instanceof Timestamp)) + obj = new Timestamp(((java.util.Date) obj).getTime()); + + return GridH2Utils.toValueTimestamp((Timestamp)obj); + case Value.DECIMAL: + return ValueDecimal.get((BigDecimal)obj); + case Value.STRING: + return ValueString.get(obj.toString()); + case Value.BYTES: + return ValueBytes.get((byte[])obj); + case Value.JAVA_OBJECT: + return ValueJavaObject.getNoCopy(obj, null, null); + case Value.ARRAY: + Object[] arr = (Object[])obj; + + Value[] valArr = new Value[arr.length]; + + for (int i = 0; i < arr.length; i++) { + Object o = arr[i]; + + valArr[i] = o == null ? ValueNull.INSTANCE : wrap(o, DataType.getTypeFromClass(o.getClass())); + } + + return ValueArray.get(valArr); + + case Value.GEOMETRY: + return ValueGeometry.getFromGeometry(obj); + } + + throw new IgniteSpiException("Failed to wrap value[type=" + type + ", value=" + obj + "]"); + } + + /** + * @return Expiration time of respective cache entry. + */ + public long expirationTime() { + return expirationTime; + } + + /** {@inheritDoc} */ + @Override public int getColumnCount() { + return DEFAULT_COLUMNS_COUNT + desc.fieldsCount(); + } + + /** + * Should be called to remove reference on value. + * + * @throws IgniteSpiException If failed. + */ + public synchronized void onSwap() throws GridException { + setValue(VAL_COL, null); + } + + /** + * Should be called when entry getting unswapped. + * + * @param val Value. + * @throws GridException If failed. + */ + public synchronized void onUnswap(Object val) throws GridException { + setValue(VAL_COL, wrap(val, desc.valueType())); + } + + /** + * Atomically updates weak value. + * + * @param exp Expected value. + * @param upd New value. + * @return Expected value if update succeeded, unexpected value otherwise. + */ + protected synchronized Value updateWeakValue(Value exp, Value upd) { + Value res = super.getValue(VAL_COL); + + if (res != exp && !(res instanceof WeakValue)) + return res; + + setValue(VAL_COL, new WeakValue(upd)); + + return exp; + } + + /** + * @return Synchronized value. + */ + protected synchronized Value syncValue() { + return super.getValue(VAL_COL); + } + + /** {@inheritDoc} */ + @Override public Value getValue(int col) { + if (col < DEFAULT_COLUMNS_COUNT) { + Value v = super.getValue(col); + + if (col == VAL_COL) { + while ((v = WeakValue.unwrap(v)) == null) { + v = getOffheapValue(VAL_COL); + + if (v != null) { + setValue(VAL_COL, v); + + if (super.getValue(KEY_COL) == null) + cache(); + + return v; + } + + try { + Object valObj = desc.readFromSwap(getValue(KEY_COL).getObject()); + + if (valObj != null) { + Value upd = wrap(valObj, desc.valueType()); + + Value res = updateWeakValue(v, upd); + + if (res == v) { + if (super.getValue(KEY_COL) == null) + cache(); + + return upd; + } + + v = res; + } + else { + // If nothing found in swap then we should be already unswapped. + v = syncValue(); + } + } + catch (GridException e) { + throw new GridRuntimeException(e); + } + } + } + + if (v == null) { + assert col == KEY_COL : col; + + v = getOffheapValue(KEY_COL); + + assert v != null : v; + + setValue(KEY_COL, v); + + if (super.getValue(VAL_COL) == null) + cache(); + } + + assert !(v instanceof WeakValue) : v; + + return v; + } + + col -= DEFAULT_COLUMNS_COUNT; + + assert col >= 0; + + Value v = getValue(desc.isKeyColumn(col) ? KEY_COL : VAL_COL); + + if (v == null) + return null; + + Object obj = v.getObject(); + + Object res = desc.columnValue(obj, col); + + if (res == null) + return ValueNull.INSTANCE; + + try { + return wrap(res, desc.fieldType(col)); + } + catch (IgniteSpiException e) { + throw DbException.convert(e); + } + } + + /** + * Caches this row for reuse. + */ + protected abstract void cache(); + + /** + * @param col Column. + * @return Value read from offheap memory or null if it is impossible. + */ + protected abstract Value getOffheapValue(int col); + + /** {@inheritDoc} */ + @Override public String toString() { + SB sb = new SB("Row@"); + + sb.a(Integer.toHexString(System.identityHashCode(this))); + + Value v = super.getValue(KEY_COL); + sb.a("[ key: ").a(v == null ? "nil" : v.getString()); + + v = WeakValue.unwrap(super.getValue(VAL_COL)); + sb.a(", val: ").a(v == null ? "nil" : v.getString()); + + sb.a(" ][ "); + + if (v != null) { + for (int i = 2, cnt = getColumnCount(); i < cnt; i++) { + v = getValue(i); + + if (i != 2) + sb.a(", "); + + sb.a(v == null ? "nil" : v.getString()); + } + } + + sb.a(" ]"); + + return sb.toString(); + } + + /** {@inheritDoc} */ + @Override public void setKeyAndVersion(SearchRow old) { + assert false; + } + + /** {@inheritDoc} */ + @Override public void setKey(long key) { + assert false; + } + + /** {@inheritDoc} */ + @Override public Row getCopy() { + assert false; + + return null; + } + + /** {@inheritDoc} */ + @Override public void setDeleted(boolean deleted) { + assert false; + } + + /** {@inheritDoc} */ + @Override public long getKey() { + assert false; + + return 0; + } + + /** {@inheritDoc} */ + @Override public void setSessionId(int sesId) { + assert false; + } + + /** {@inheritDoc} */ + @Override public void setVersion(int ver) { + assert false; + } + + /** + * Weak reference to value that was swapped but accessed in indexing SPI. + */ + private static class WeakValue extends Value { + /** + * Unwraps value. + * + * @param v Value. + * @return Unwrapped value. + */ + static Value unwrap(Value v) { + return (v instanceof WeakValue) ? ((WeakValue)v).get() : v; + } + + /** */ + private final WeakReference<Value> ref; + + /** + * @param v Value. + */ + private WeakValue(Value v) { + ref = new WeakReference<>(v); + } + + /** + * @return Referenced value. + */ + public Value get() { + return ref.get(); + } + + /** {@inheritDoc} */ + @Override public String getSQL() { + throw new IllegalStateException(); + } + + /** {@inheritDoc} */ + @Override public int getType() { + throw new IllegalStateException(); + } + + /** {@inheritDoc} */ + @Override public long getPrecision() { + throw new IllegalStateException(); + } + + /** {@inheritDoc} */ + @Override public int getDisplaySize() { + throw new IllegalStateException(); + } + + /** {@inheritDoc} */ + @Override public String getString() { + throw new IllegalStateException(); + } + + /** {@inheritDoc} */ + @Override public Object getObject() { + throw new IllegalStateException(); + } + + /** {@inheritDoc} */ + @Override public void set(PreparedStatement preparedStatement, int i) throws SQLException { + throw new IllegalStateException(); + } + + /** {@inheritDoc} */ + @Override protected int compareSecure(Value val, CompareMode compareMode) { + throw new IllegalStateException(); + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + throw new IllegalStateException(); + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + throw new IllegalStateException(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/922a2bab/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/opt/GridH2Cursor.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/opt/GridH2Cursor.java b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/opt/GridH2Cursor.java new file mode 100644 index 0000000..5cc3711 --- /dev/null +++ b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/opt/GridH2Cursor.java @@ -0,0 +1,62 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.gridgain.grid.kernal.processors.query.h2.opt; + +import org.h2.index.*; +import org.h2.message.*; +import org.h2.result.*; + +import java.util.*; + +/** + * H2 Cursor implementation. + */ +public class GridH2Cursor implements Cursor { + /** */ + private Iterator<GridH2Row> iter; + + /** */ + private Row row; + + /** + * Constructor. + * + * @param iter Rows iterator. + */ + public GridH2Cursor(Iterator<GridH2Row> iter) { + this.iter = iter; + } + + /** {@inheritDoc} */ + @Override public Row get() { + return row; + } + + /** {@inheritDoc} */ + @Override public SearchRow getSearchRow() { + return get(); + } + + /** {@inheritDoc} */ + @Override public boolean next() { + row = null; + + if (iter.hasNext()) + row = iter.next(); + + return row != null; + } + + /** {@inheritDoc} */ + @Override public boolean previous() { + // Should never be called. + throw DbException.getUnsupportedException("previous"); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/922a2bab/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/opt/GridH2IndexBase.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/opt/GridH2IndexBase.java b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/opt/GridH2IndexBase.java new file mode 100644 index 0000000..00cb06d --- /dev/null +++ b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/opt/GridH2IndexBase.java @@ -0,0 +1,198 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.gridgain.grid.kernal.processors.query.h2.opt; + +import org.apache.ignite.lang.*; +import org.apache.ignite.spi.indexing.*; +import org.gridgain.grid.util.lang.*; +import org.gridgain.grid.util.typedef.internal.*; +import org.h2.engine.*; +import org.h2.index.*; +import org.h2.message.*; +import org.h2.result.*; +import org.jetbrains.annotations.*; + +import java.util.*; + +/** + * Index base. + */ +public abstract class GridH2IndexBase extends BaseIndex { + /** */ + protected static final ThreadLocal<GridIndexingQueryFilter> filters = new ThreadLocal<>(); + + /** */ + protected final int keyCol; + + /** */ + protected final int valCol; + + /** + * @param keyCol Key column. + * @param valCol Value column. + */ + protected GridH2IndexBase(int keyCol, int valCol) { + this.keyCol = keyCol; + this.valCol = valCol; + } + + /** + * Sets key filters for current thread. + * + * @param fs Filters. + */ + public static void setFiltersForThread(GridIndexingQueryFilter fs) { + filters.set(fs); + } + + /** + * If the index supports rebuilding it has to creates its own copy. + * + * @return Rebuilt copy. + * @throws InterruptedException If interrupted. + */ + public GridH2IndexBase rebuild() throws InterruptedException { + return this; + } + + /** + * Put row if absent. + * + * @param row Row. + * @return Existing row or {@code null}. + */ + public abstract GridH2Row put(GridH2Row row); + + /** + * Remove row from index. + * + * @param row Row. + * @return Removed row. + */ + public abstract GridH2Row remove(SearchRow row); + + /** + * Takes or sets existing snapshot to be used in current thread. + * + * @param s Optional existing snapshot to use. + * @return Snapshot. + */ + public Object takeSnapshot(@Nullable Object s) { + return s; + } + + /** + * Releases snapshot for current thread. + */ + public void releaseSnapshot() { + // No-op. + } + + /** + * Filters rows from expired ones and using predicate. + * + * @param iter Iterator over rows. + * @return Filtered iterator. + */ + protected Iterator<GridH2Row> filter(Iterator<GridH2Row> iter) { + IgniteBiPredicate<Object, Object> p = null; + + GridIndexingQueryFilter f = filters.get(); + + if (f != null) { + String spaceName = ((GridH2Table)getTable()).spaceName(); + + p = f.forSpace(spaceName); + } + + return new FilteringIterator(iter, U.currentTimeMillis(), p); + } + + /** {@inheritDoc} */ + @Override public long getDiskSpaceUsed() { + return 0; + } + + /** {@inheritDoc} */ + @Override public void checkRename() { + throw DbException.getUnsupportedException("rename"); + } + + /** {@inheritDoc} */ + @Override public void add(Session ses, Row row) { + throw DbException.getUnsupportedException("add"); + } + + /** {@inheritDoc} */ + @Override public void remove(Session ses, Row row) { + throw DbException.getUnsupportedException("remove row"); + } + + /** {@inheritDoc} */ + @Override public void remove(Session ses) { + throw DbException.getUnsupportedException("remove index"); + } + + /** {@inheritDoc} */ + @Override public void truncate(Session ses) { + throw DbException.getUnsupportedException("truncate"); + } + + /** {@inheritDoc} */ + @Override public boolean needRebuild() { + return false; + } + + /** + * Iterator which filters by expiration time and predicate. + */ + protected class FilteringIterator extends GridFilteredIterator<GridH2Row> { + /** */ + private final IgniteBiPredicate<Object, Object> fltr; + + /** */ + private final long time; + + /** + * @param iter Iterator. + * @param time Time for expired rows filtering. + */ + protected FilteringIterator(Iterator<GridH2Row> iter, long time, + IgniteBiPredicate<Object, Object> fltr) { + super(iter); + + this.time = time; + this.fltr = fltr; + } + + /** + * @param row Row. + * @return If this row was accepted. + */ + @SuppressWarnings("unchecked") + @Override protected boolean accept(GridH2Row row) { + if (row instanceof GridH2AbstractKeyValueRow) { + if (((GridH2AbstractKeyValueRow) row).expirationTime() <= time) + return false; + } + + if (fltr == null) + return true; + + Object key = row.getValue(keyCol).getObject(); + Object val = row.getValue(valCol).getObject(); + + assert key != null; + assert val != null; + + return fltr.apply(key, val); + } + } +}