ignite-sql-tests - jdbc
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/a06a5575 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/a06a5575 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/a06a5575 Branch: refs/heads/ignite-sql-tests Commit: a06a55750c42649acba4a27697809dde463542f1 Parents: cfcb9a4 Author: S.Vladykin <svlady...@gridgain.com> Authored: Tue Mar 17 05:23:11 2015 +0300 Committer: S.Vladykin <svlady...@gridgain.com> Committed: Tue Mar 17 05:23:11 2015 +0300 ---------------------------------------------------------------------- .../ignite/jdbc/JdbcEmptyCacheSelfTest.java | 3 + .../processors/cache/IgniteCacheProxy.java | 4 +- .../processors/cache/QueryCursorImpl.java | 18 +++ .../query/jdbc/GridCacheQueryJdbcTask.java | 5 +- .../processors/query/GridQueryProcessor.java | 14 +- .../processors/query/h2/IgniteH2Indexing.java | 143 +++++++------------ .../query/h2/sql/GridSqlQueryParser.java | 33 +++-- .../query/h2/sql/GridSqlQuerySplitter.java | 19 ++- .../h2/twostep/GridReduceQueryExecutor.java | 47 +++--- 9 files changed, 150 insertions(+), 136 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a06a5575/modules/clients/src/test/java/org/apache/ignite/jdbc/JdbcEmptyCacheSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/java/org/apache/ignite/jdbc/JdbcEmptyCacheSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/jdbc/JdbcEmptyCacheSelfTest.java index 9742999..3869ddd 100644 --- a/modules/clients/src/test/java/org/apache/ignite/jdbc/JdbcEmptyCacheSelfTest.java +++ b/modules/clients/src/test/java/org/apache/ignite/jdbc/JdbcEmptyCacheSelfTest.java @@ -54,6 +54,9 @@ public class JdbcEmptyCacheSelfTest extends GridCommonAbstractTest { cache.setCacheMode(PARTITIONED); cache.setBackups(1); cache.setWriteSynchronizationMode(FULL_SYNC); + cache.setIndexedTypes( + Byte.class, Byte.class + ); cfg.setCacheConfiguration(cache); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a06a5575/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java index aaa63fd..3216ccc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java @@ -489,8 +489,8 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V * @return Cursor. */ private QueryCursor<List<?>> doLocalFieldsQuery(SqlFieldsQuery q) { - return new QueryCursorImpl<>(ctx.kernalContext().query().queryLocalFields( - ctx.name(), q.getSql(), q.getArgs())); + return ctx.kernalContext().query().queryLocalFields( + ctx.name(), q.getSql(), q.getArgs()); } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a06a5575/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/QueryCursorImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/QueryCursorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/QueryCursorImpl.java index 62e7376..7cb9efc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/QueryCursorImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/QueryCursorImpl.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache; import org.apache.ignite.*; import org.apache.ignite.internal.processors.cache.query.*; +import org.apache.ignite.internal.processors.query.*; import java.util.*; @@ -32,6 +33,9 @@ public class QueryCursorImpl<T> implements QueryCursorEx<T> { /** */ private boolean iterTaken; + /** */ + private Collection<GridQueryFieldMetadata> fieldsMeta; + /** * @param iter Iterator. */ @@ -95,4 +99,18 @@ public class QueryCursorImpl<T> implements QueryCursorEx<T> { } } } + + /** + * @param fieldsMeta SQL Fields query result metadata. + */ + public void fieldsMeta(Collection<GridQueryFieldMetadata> fieldsMeta) { + this.fieldsMeta = fieldsMeta; + } + + /** + * @return SQL Fields query result metadata. + */ + public Collection<GridQueryFieldMetadata> fieldsMeta() { + return fieldsMeta; + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a06a5575/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/jdbc/GridCacheQueryJdbcTask.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/jdbc/GridCacheQueryJdbcTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/jdbc/GridCacheQueryJdbcTask.java index b53a9e1..332c649 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/jdbc/GridCacheQueryJdbcTask.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/jdbc/GridCacheQueryJdbcTask.java @@ -21,6 +21,7 @@ import org.apache.ignite.*; import org.apache.ignite.cache.query.*; import org.apache.ignite.cluster.*; import org.apache.ignite.compute.*; +import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.query.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; @@ -194,7 +195,9 @@ public class GridCacheQueryJdbcTask extends ComputeTaskAdapter<byte[], byte[]> { QueryCursor<List<?>> cursor = cache.queryFields(qry); - Collection<GridQueryFieldMetadata> meta = null; // TODO + Collection<GridQueryFieldMetadata> meta = ((QueryCursorImpl<List<?>>)cursor).fieldsMeta(); + + assert meta != null; tbls = new ArrayList<>(meta.size()); cols = new ArrayList<>(meta.size()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a06a5575/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java index 11a9f2c..aa924c4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java @@ -35,7 +35,6 @@ import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.internal.util.worker.*; import org.apache.ignite.lang.*; -import org.apache.ignite.spi.*; import org.apache.ignite.spi.indexing.*; import org.jdk8.backport.*; import org.jetbrains.annotations.*; @@ -555,13 +554,12 @@ public class GridQueryProcessor extends GridProcessorAdapter { * @param args Arguments. * @return Iterator. */ - public Iterator<List<?>> queryLocalFields(String space, String sql, Object[] args) { + public QueryCursor<List<?>> queryLocalFields(String space, String sql, Object[] args) { if (!busyLock.enterBusy()) throw new IllegalStateException("Failed to execute query (grid is stopping)."); try { - IgniteSpiCloseableIterator<List<?>> iterator = - idx.queryFields(space, sql, F.asList(args), idx.backupFilter()).iterator(); + GridQueryFieldsResult res = idx.queryFields(space, sql, F.asList(args), idx.backupFilter()); if (ctx.event().isRecordable(EVT_CACHE_QUERY_EXECUTED)) { ctx.event().record(new CacheQueryExecutedEvent<>( @@ -579,10 +577,14 @@ public class GridQueryProcessor extends GridProcessorAdapter { null)); } - return iterator; + QueryCursorImpl<List<?>> cursor = new QueryCursorImpl<>(res.iterator()); + + cursor.fieldsMeta(res.metaData()); + + return cursor; } catch (IgniteCheckedException e) { - throw new IgniteException(e); + throw new CacheException(e); } finally { busyLock.leaveBusy(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a06a5575/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java index c3a3da3..1e431db 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java @@ -65,7 +65,6 @@ import java.sql.*; import java.text.*; import java.util.*; import java.util.concurrent.*; -import java.util.concurrent.locks.*; import static org.apache.ignite.IgniteSystemProperties.*; import static org.apache.ignite.internal.processors.query.GridQueryIndexType.*; @@ -496,18 +495,7 @@ public class IgniteH2Indexing implements GridQueryIndexing { if (rs != null) { try { - ResultSetMetaData rsMeta = rs.getMetaData(); - - meta = new ArrayList<>(rsMeta.getColumnCount()); - - for (int i = 1; i <= rsMeta.getColumnCount(); i++) { - String schemaName = rsMeta.getSchemaName(i); - String typeName = rsMeta.getTableName(i); - String name = rsMeta.getColumnLabel(i); - String type = rsMeta.getColumnClassName(i); - - meta.add(new SqlFieldMetadata(schemaName, typeName, name, type)); - } + meta = meta(rs.getMetaData()); } catch (SQLException e) { throw new IgniteSpiException("Failed to get meta data.", e); @@ -522,6 +510,26 @@ public class IgniteH2Indexing implements GridQueryIndexing { } /** + * @param rsMeta Metadata. + * @return List of fields metadata. + * @throws SQLException If failed. + */ + private static List<GridQueryFieldMetadata> meta(ResultSetMetaData rsMeta) throws SQLException { + ArrayList<GridQueryFieldMetadata> meta = new ArrayList<>(rsMeta.getColumnCount()); + + for (int i = 1; i <= rsMeta.getColumnCount(); i++) { + String schemaName = rsMeta.getSchemaName(i); + String typeName = rsMeta.getTableName(i); + String name = rsMeta.getColumnLabel(i); + String type = rsMeta.getColumnClassName(i); + + meta.add(new SqlFieldMetadata(schemaName, typeName, name, type)); + } + + return meta; + } + + /** * @param stmt Prepared statement. * @return Command type. */ @@ -739,12 +747,38 @@ public class IgniteH2Indexing implements GridQueryIndexing { @Override public QueryCursor<List<?>> queryTwoStep(String space, String sqlQry, Object[] params) { Connection c = connectionForSpace(space); - GridCacheTwoStepQuery twoStepQry = GridSqlQuerySplitter.split(c, sqlQry, params); + PreparedStatement stmt; + + try { + stmt = c.prepareStatement(sqlQry); + } + catch (SQLException e) { + throw new CacheException("Failed to parse query: " + sqlQry, e); + } + + GridCacheTwoStepQuery twoStepQry; + Collection<GridQueryFieldMetadata> meta; + + try { + twoStepQry = GridSqlQuerySplitter.split((JdbcPreparedStatement)stmt, params); + + meta = meta(stmt.getMetaData()); + } + catch (SQLException e) { + throw new CacheException(e); + } + finally { + U.close(stmt, log); + } if (log.isDebugEnabled()) log.debug("Parsed query: `" + sqlQry + "` into two step query: " + twoStepQry); - return queryTwoStep(space, twoStepQry); + QueryCursorImpl<List<?>> cursor = (QueryCursorImpl<List<?>>)queryTwoStep(space, twoStepQry); + + cursor.fieldsMeta(meta); + + return cursor; } /** @@ -1049,7 +1083,7 @@ public class IgniteH2Indexing implements GridQueryIndexing { if (Utils.serializer != null) U.warn(log, "Custom H2 serialization is already configured, will override."); - Utils.serializer = h2Serializer(ctx != null && ctx.deploy().enabled()); + Utils.serializer = h2Serializer(); String dbName = (ctx != null ? ctx.localNodeId() : UUID.randomUUID()).toString(); @@ -1097,83 +1131,10 @@ public class IgniteH2Indexing implements GridQueryIndexing { } /** - * @param p2pEnabled If peer-deployment is enabled. * @return Serializer. */ - protected JavaObjectSerializer h2Serializer(boolean p2pEnabled) { - return p2pEnabled ? - new JavaObjectSerializer() { - /** */ - private volatile Map<ClassLoader, Byte> ldr2id = Collections.emptyMap(); - - /** */ - private volatile Map<Byte, ClassLoader> id2ldr = Collections.emptyMap(); - - /** */ - private byte ldrIdGen = Byte.MIN_VALUE; - - /** */ - private final Lock lock = new ReentrantLock(); - - @Override public byte[] serialize(Object obj) throws Exception { - ClassLoader ldr = obj.getClass().getClassLoader(); - - Byte ldrId = ldr2id.get(ldr); - - if (ldrId == null) { - lock.lock(); - - try { - ldrId = ldr2id.get(ldr); - - if (ldrId == null) { - ldrId = ldrIdGen++; - - if (id2ldr.containsKey(ldrId)) // Overflow. - throw new IgniteException("Failed to add new peer-to-peer class loader."); - - Map<Byte, ClassLoader> id2ldr0 = new HashMap<>(id2ldr); - Map<ClassLoader, Byte> ldr2id0 = new IdentityHashMap<>(ldr2id); - - id2ldr0.put(ldrId, ldr); - ldr2id0.put(ldr, ldrId); - - ldr2id = ldr2id0; - id2ldr = id2ldr0; - } - } - finally { - lock.unlock(); - } - } - - byte[] bytes = marshaller.marshal(obj); - - int len = bytes.length; - - bytes = Arrays.copyOf(bytes, len + 1); // The last byte is for ldrId. - - bytes[len] = ldrId; - - return bytes; - } - - @Override public Object deserialize(byte[] bytes) throws Exception { - int last = bytes.length - 1; - - byte ldrId = bytes[last]; - - ClassLoader ldr = id2ldr.get(ldrId); - - if (ldr == null) - throw new IllegalStateException("Class loader was not found: " + ldrId); - - bytes = Arrays.copyOf(bytes, last); // Trim the last byte. - - return marshaller.unmarshal(bytes, ldr); - } - } : - new JavaObjectSerializer() { + protected JavaObjectSerializer h2Serializer() { + return new JavaObjectSerializer() { @Override public byte[] serialize(Object obj) throws Exception { return marshaller.marshal(obj); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a06a5575/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQueryParser.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQueryParser.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQueryParser.java index a8c83d6..2e2f9c3 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQueryParser.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQueryParser.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.processors.query.h2.sql; import org.apache.ignite.*; +import org.h2.command.*; import org.h2.command.dml.*; import org.h2.engine.*; import org.h2.expression.*; @@ -28,7 +29,6 @@ import org.h2.value.*; import org.jetbrains.annotations.*; import java.lang.reflect.*; -import java.sql.*; import java.util.*; import java.util.Set; @@ -151,17 +151,34 @@ public class GridSqlQueryParser { private static final Getter<JavaFunction, FunctionAlias> FUNC_ALIAS = getter(JavaFunction.class, "functionAlias"); /** */ + private static final Getter<JdbcPreparedStatement,Command> COMMAND = getter(JdbcPreparedStatement.class, "command"); + + /** */ + private static volatile Getter<Command,Prepared> prepared; + + /** */ private final IdentityHashMap<Object, Object> h2ObjToGridObj = new IdentityHashMap<>(); /** - * @param conn Connection. - * @param select Select query. - * @return Parsed select query. + * @param stmt Prepared statement. + * @return Parsed select. */ - public static GridSqlSelect parse(Connection conn, String select) { - Session ses = (Session)((JdbcConnection)conn).getSession(); + public static GridSqlSelect parse(JdbcPreparedStatement stmt) { + Command cmd = COMMAND.get(stmt); + + Getter<Command,Prepared> p = prepared; + + if (p == null) { + Class<? extends Command> cls = cmd.getClass(); + + assert cls.getSimpleName().equals("CommandContainer"); + + prepared = p = getter(cls, "prepared"); + } + + Prepared select = p.get(cmd); - return new GridSqlQueryParser().parse((Select)ses.prepare(select)); + return new GridSqlQueryParser().parse((Select)select); } /** @@ -510,7 +527,7 @@ public class GridSqlQueryParser { * @param cls Class. * @param fldName Fld name. */ - private static <T, R> Getter<T, R> getter(Class<T> cls, String fldName) { + private static <T, R> Getter<T, R> getter(Class<? extends T> cls, String fldName) { Field field; try { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a06a5575/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java index 019ed59..47e5e05 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java @@ -19,9 +19,9 @@ package org.apache.ignite.internal.processors.query.h2.sql; import org.apache.ignite.*; import org.apache.ignite.internal.processors.cache.query.*; +import org.h2.jdbc.*; import org.h2.value.*; -import java.sql.*; import java.util.*; import static org.apache.ignite.internal.processors.query.h2.sql.GridSqlFunctionType.*; @@ -56,16 +56,15 @@ public class GridSqlQuerySplitter { } /** - * @param conn Connection. - * @param query Query. + * @param stmt Prepared statement. * @param params Parameters. * @return Two step query. */ - public static GridCacheTwoStepQuery split(Connection conn, String query, Object[] params) { + public static GridCacheTwoStepQuery split(JdbcPreparedStatement stmt, Object[] params) { if (params == null) params = GridCacheSqlQuery.EMPTY_PARAMS; - GridSqlSelect srcQry = GridSqlQueryParser.parse(conn, query); + GridSqlSelect srcQry = GridSqlQueryParser.parse(stmt); final String mergeTable = TABLE_FUNC_NAME + "()"; // table(0); TODO @@ -299,16 +298,14 @@ public class GridSqlQuerySplitter { String mapColAlias = columnName(idx); String rdcColAlias; - if (alias == null) { // Wrap map column with generated alias if none. + if (alias == null) // Original column name for reduce column. rdcColAlias = el instanceof GridSqlColumn ? ((GridSqlColumn)el).columnName() : mapColAlias; - - alias = alias(mapColAlias, el); // `el` is known not to be alias. - - mapSelect.set(idx, alias); - } else // Set initial alias for reduce column. rdcColAlias = alias.alias(); + // Always wrap map column into generated alias. + mapSelect.set(idx, alias(mapColAlias, el)); // `el` is known not to be an alias. + if (idx < rdcSelect.length) { // SELECT __C0 AS orginal_alias GridSqlElement rdcEl = column(mapColAlias); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a06a5575/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java index f3d6bfc..4c1dde7 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java @@ -200,24 +200,37 @@ public class GridReduceQueryExecutor implements GridMessageListener { GridMergeIndex idx = r.tbls.get(msg.query()).getScanIndex(null); - idx.addPage(new GridResultPage(node.id(), msg, false) { - @Override public void fetchNextPage() { - if (r.rmtErr != null) - throw new CacheException("Next page fetch failed.", r.rmtErr); - - try { - GridQueryNextPageRequest msg0 = new GridQueryNextPageRequest(qryReqId, qry, pageSize); - - if (node.isLocal()) - h2.mapQueryExecutor().onMessage(ctx.localNodeId(), msg0); - else - ctx.io().send(node, GridTopic.TOPIC_QUERY, msg0, GridIoPolicy.PUBLIC_POOL); - } - catch (IgniteCheckedException e) { - throw new CacheException(e); + GridResultPage page; + + try { + page = new GridResultPage(node.id(), msg, false) { + @Override public void fetchNextPage() { + if (r.rmtErr != null) + throw new CacheException("Next page fetch failed.", r.rmtErr); + + try { + GridQueryNextPageRequest msg0 = new GridQueryNextPageRequest(qryReqId, qry, pageSize); + + if (node.isLocal()) + h2.mapQueryExecutor().onMessage(ctx.localNodeId(), msg0); + else + ctx.io().send(node, GridTopic.TOPIC_QUERY, msg0, GridIoPolicy.PUBLIC_POOL); + } + catch (IgniteCheckedException e) { + throw new CacheException(e); + } } - } - }); + }; + } + catch (Exception e) { + U.error(log, "Error in message.", e); + + fail(r, node.id(), "Error in message."); + + return; + } + + idx.addPage(page); if (msg.allRows() != -1) // Only the first page contains row count. r.latch.countDown();