ignite-gg-9933 - keep portable flag fix
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/f16eadee Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/f16eadee Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/f16eadee Branch: refs/heads/ignite-gg-9933 Commit: f16eadee1c230170768448114a05358542b1ab05 Parents: 89ac9dc Author: S.Vladykin <svlady...@gridgain.com> Authored: Fri Mar 20 02:03:15 2015 +0300 Committer: S.Vladykin <svlady...@gridgain.com> Committed: Fri Mar 20 02:03:15 2015 +0300 ---------------------------------------------------------------------- .../processors/cache/IgniteCacheProxy.java | 10 ++- .../cache/query/GridCacheQueriesImpl.java | 2 +- .../cache/query/GridCacheTwoStepQuery.java | 20 ++++++ .../processors/query/GridQueryIndexing.java | 20 +++--- .../query/GridQueryPortableFieldsIterator.java | 70 ++++++++++++++++++++ .../processors/query/GridQueryProcessor.java | 48 ++++++++------ .../processors/query/h2/IgniteH2Indexing.java | 32 ++++++--- .../h2/twostep/GridReduceQueryExecutor.java | 13 ++-- 8 files changed, 160 insertions(+), 55 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f16eadee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java index 3216ccc..53e9a00 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java @@ -422,7 +422,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V if (isReplicatedDataNode() || ctx.isLocal()) return doLocalQuery(p); - return ctx.kernalContext().query().queryTwoStep(ctx.name(), p.getType(), p.getSql(), p.getArgs()); + return ctx.kernalContext().query().queryTwoStep(ctx, p); } return query(qry, projection(false)); @@ -462,7 +462,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V if (isReplicatedDataNode() || ctx.isLocal()) return doLocalFieldsQuery(qry); - return ctx.kernalContext().query().queryTwoStep(ctx.name(), qry.getSql(), qry.getArgs()); + return ctx.kernalContext().query().queryTwoStep(ctx, qry); } catch (Exception e) { if (e instanceof CacheException) @@ -480,8 +480,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V * @return Cursor. */ private QueryCursor<Entry<K,V>> doLocalQuery(SqlQuery p) { - return new QueryCursorImpl<>(ctx.kernalContext().query().<K,V>queryLocal( - ctx.name(), p.getType(), p.getSql(), p.getArgs())); + return new QueryCursorImpl<>(ctx.kernalContext().query().<K,V>queryLocal(ctx, p)); } /** @@ -489,8 +488,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V * @return Cursor. */ private QueryCursor<List<?>> doLocalFieldsQuery(SqlFieldsQuery q) { - return ctx.kernalContext().query().queryLocalFields( - ctx.name(), q.getSql(), q.getArgs()); + return ctx.kernalContext().query().queryLocalFields(ctx, q); } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f16eadee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueriesImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueriesImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueriesImpl.java index a018e98..ae25c1c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueriesImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueriesImpl.java @@ -165,7 +165,7 @@ public class GridCacheQueriesImpl<K, V> implements GridCacheQueriesEx<K, V>, Ext /** {@inheritDoc} */ @Override public QueryCursor<List<?>> executeTwoStepQuery(String space, String sqlQry, Object[] params) { - return ctx.kernalContext().query().queryTwoStep(space, sqlQry, params); + return ctx.kernalContext().query().queryTwoStep(ctx, new SqlFieldsQuery(sqlQry).setArgs(params)); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f16eadee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java index 585d78e..11b2057 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java @@ -34,6 +34,9 @@ public class GridCacheTwoStepQuery implements Serializable { private static final long serialVersionUID = 0L; /** */ + public static final int DFLT_PAGE_SIZE = 1000; + + /** */ @GridToStringInclude private Map<String, GridCacheSqlQuery> mapQrys; @@ -41,6 +44,9 @@ public class GridCacheTwoStepQuery implements Serializable { @GridToStringInclude private GridCacheSqlQuery reduce; + /** */ + private int pageSize = DFLT_PAGE_SIZE; + /** * @param qry Reduce query. * @param params Reduce query parameters. @@ -50,6 +56,20 @@ public class GridCacheTwoStepQuery implements Serializable { } /** + * @param pageSize Page size. + */ + public void pageSize(int pageSize) { + this.pageSize = pageSize; + } + + /** + * @return Page size. + */ + public int pageSize() { + return pageSize; + } + + /** * @param alias Alias. * @param qry SQL Query. * @param params Query parameters. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f16eadee/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java index fb8f4b8..2caae45 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java @@ -21,6 +21,7 @@ import org.apache.ignite.*; import org.apache.ignite.cache.query.*; import org.apache.ignite.configuration.*; import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.query.*; import org.apache.ignite.internal.util.lang.*; import org.apache.ignite.lang.*; @@ -53,32 +54,29 @@ public interface GridQueryIndexing { /** * Runs two step query. * - * @param space Space name. + * @param cctx Cache context. * @param qry Query. * @return Cursor. */ - public QueryCursor<List<?>> queryTwoStep(String space, GridCacheTwoStepQuery qry); + public QueryCursor<List<?>> queryTwoStep(GridCacheContext<?,?> cctx, GridCacheTwoStepQuery qry); /** * Parses SQL query into two step query and executes it. * - * @param space Space. - * @param sqlQry Query. - * @param params Parameters. + * @param cctx Cache context. + * @param qry Query. * @return Cursor. */ - public QueryCursor<List<?>> queryTwoStep(String space, String sqlQry, Object[] params); + public QueryCursor<List<?>> queryTwoStep(GridCacheContext<?,?> cctx, SqlFieldsQuery qry); /** * Parses SQL query into two step query and executes it. * - * @param space Space. - * @param type Type name. - * @param sqlQry Query. - * @param params Parameters. + * @param cctx Cache context. + * @param qry Query. * @return Cursor. */ - public <K,V> QueryCursor<Cache.Entry<K,V>> queryTwoStep(String space, String type, String sqlQry, Object[] params); + public <K,V> QueryCursor<Cache.Entry<K,V>> queryTwoStep(GridCacheContext<?,?> cctx, SqlQuery qry); /** * Queries individual fields (generally used by JDBC drivers). http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f16eadee/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryPortableFieldsIterator.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryPortableFieldsIterator.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryPortableFieldsIterator.java new file mode 100644 index 0000000..8ccebe2 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryPortableFieldsIterator.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query; + +import org.apache.ignite.internal.processors.cache.*; +import org.apache.ignite.internal.util.typedef.internal.*; + +import java.util.*; + +/** + * Deserializes portable objects if needed. + */ +public class GridQueryPortableFieldsIterator implements Iterator<List<?>>, AutoCloseable { + /** */ + private final Iterator<List<?>> iter; + + /** */ + private final GridCacheContext<?,?> cctx; + + /** */ + private final boolean keepPortable; + + /** + * @param iter Iterator. + * @param cctx Cache context. + * @param keepPortable Keep portable. + */ + public GridQueryPortableFieldsIterator(Iterator<List<?>> iter, GridCacheContext<?,?> cctx, boolean keepPortable) { + this.iter = iter; + this.cctx = cctx; + this.keepPortable = keepPortable; + } + + /** {@inheritDoc} */ + @Override public void close() throws Exception { + if (iter instanceof AutoCloseable) + U.closeQuiet((AutoCloseable)iter); + } + + /** {@inheritDoc} */ + @Override public boolean hasNext() { + return iter.hasNext(); + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public List<?> next() { + return (List<?>)cctx.unwrapPortablesIfNeeded((Collection<Object>)iter.next(), keepPortable); + } + + /** {@inheritDoc} */ + @Override public void remove() { + iter.remove(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f16eadee/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java index 068ebe5..547b707 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java @@ -433,7 +433,7 @@ public class GridQueryProcessor extends GridProcessorAdapter { throw new IllegalStateException("Failed to execute query (grid is stopping)."); try { - return idx.queryTwoStep(space, qry); + return idx.queryTwoStep(ctx.cache().internalCache(space).context(), qry); } finally { busyLock.leaveBusy(); @@ -441,17 +441,16 @@ public class GridQueryProcessor extends GridProcessorAdapter { } /** - * @param space Space. - * @param sqlQry Query. - * @param params Parameters. + * @param cctx Cache context. + * @param qry Query. * @return Cursor. */ - public QueryCursor<List<?>> queryTwoStep(String space, String sqlQry, Object[] params) { + public QueryCursor<List<?>> queryTwoStep(GridCacheContext<?,?> cctx, SqlFieldsQuery qry) { if (!busyLock.enterBusy()) throw new IllegalStateException("Failed to execute query (grid is stopping)."); try { - return idx.queryTwoStep(space, sqlQry, params); + return idx.queryTwoStep(cctx, qry); } finally { busyLock.leaveBusy(); @@ -459,18 +458,16 @@ public class GridQueryProcessor extends GridProcessorAdapter { } /** - * @param space Space. - * @param type Type. - * @param sqlQry Query. - * @param params Parameters. + * @param cctx Cache context. + * @param qry Query. * @return Cursor. */ - public <K,V> QueryCursor<Cache.Entry<K,V>> queryTwoStep(String space, String type, String sqlQry, Object[] params) { + public <K,V> QueryCursor<Cache.Entry<K,V>> queryTwoStep(GridCacheContext<?,?> cctx, SqlQuery qry) { if (!busyLock.enterBusy()) throw new IllegalStateException("Failed to execute query (grid is stopping)."); try { - return idx.queryTwoStep(space, type, sqlQry, params); + return idx.queryTwoStep(cctx, qry); } finally { busyLock.leaveBusy(); @@ -478,17 +475,20 @@ public class GridQueryProcessor extends GridProcessorAdapter { } /** - * @param space Space. - * @param type Type. - * @param sqlQry Query. - * @param params Parameters. + * @param cctx Cache context. + * @param qry Query. * @return Cursor. */ - public <K,V> Iterator<Cache.Entry<K,V>> queryLocal(String space, String type, String sqlQry, Object[] params) { + public <K,V> Iterator<Cache.Entry<K,V>> queryLocal(GridCacheContext<?,?> cctx, SqlQuery qry) { if (!busyLock.enterBusy()) throw new IllegalStateException("Failed to execute query (grid is stopping)."); try { + String space = cctx.name(); + String type = qry.getType(); + String sqlQry = qry.getSql(); + Object[] params = qry.getArgs(); + TypeDescriptor typeDesc = typesByName.get(new TypeName(space, type)); if (typeDesc == null || !typeDesc.registered()) @@ -549,16 +549,19 @@ public class GridQueryProcessor extends GridProcessorAdapter { } /** - * @param space Space. - * @param sql SQL Query. - * @param args Arguments. + * @param cctx Cache context. + * @param qry Query. * @return Iterator. */ - public QueryCursor<List<?>> queryLocalFields(String space, String sql, Object[] args) { + public QueryCursor<List<?>> queryLocalFields(GridCacheContext<?,?> cctx, SqlFieldsQuery qry) { if (!busyLock.enterBusy()) throw new IllegalStateException("Failed to execute query (grid is stopping)."); try { + String space = cctx.name(); + String sql = qry.getSql(); + Object[] args = qry.getArgs(); + GridQueryFieldsResult res = idx.queryFields(space, sql, F.asList(args), idx.backupFilter()); if (ctx.event().isRecordable(EVT_CACHE_QUERY_EXECUTED)) { @@ -577,7 +580,8 @@ public class GridQueryProcessor extends GridProcessorAdapter { null)); } - QueryCursorImpl<List<?>> cursor = new QueryCursorImpl<>(res.iterator()); + QueryCursorImpl<List<?>> cursor = new QueryCursorImpl<>( + new GridQueryPortableFieldsIterator(res.iterator(), cctx, cctx.keepPortable())); cursor.fieldsMeta(res.metaData()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f16eadee/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java index 1e431db..ee6aedf 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java @@ -694,29 +694,36 @@ public class IgniteH2Indexing implements GridQueryIndexing { } /** {@inheritDoc} */ - @Override public QueryCursor<List<?>> queryTwoStep(String space, GridCacheTwoStepQuery qry) { - return rdcQryExec.query(space, qry); + @Override public QueryCursor<List<?>> queryTwoStep(GridCacheContext<?,?> cctx, GridCacheTwoStepQuery qry) { + return rdcQryExec.query(cctx, qry); } /** {@inheritDoc} */ @SuppressWarnings("unchecked") - @Override public <K, V> QueryCursor<Cache.Entry<K,V>> queryTwoStep(String space, String type, String sqlQry, - Object[] params) { + @Override public <K, V> QueryCursor<Cache.Entry<K,V>> queryTwoStep(GridCacheContext<?,?> cctx, SqlQuery qry) { + String type = qry.getType(); + String space = cctx.name(); + TableDescriptor tblDesc = tableDescriptor(type, space); if (tblDesc == null) throw new CacheException("Failed to find SQL table for type: " + type); - String qry; + String sql; try { - qry = generateQuery(sqlQry, tblDesc); + sql = generateQuery(qry.getSql(), tblDesc); } catch (IgniteCheckedException e) { throw new IgniteException(e); } - final QueryCursor<List<?>> res = queryTwoStep(space, qry, params); + SqlFieldsQuery fqry = new SqlFieldsQuery(sql); + + fqry.setArgs(qry.getArgs()); + fqry.setPageSize(qry.getPageSize()); + + final QueryCursor<List<?>> res = queryTwoStep(cctx, fqry); final Iterator<List<?>> iter0 = res.iterator(); @@ -744,7 +751,10 @@ public class IgniteH2Indexing implements GridQueryIndexing { } /** {@inheritDoc} */ - @Override public QueryCursor<List<?>> queryTwoStep(String space, String sqlQry, Object[] params) { + @Override public QueryCursor<List<?>> queryTwoStep(GridCacheContext<?,?> cctx, SqlFieldsQuery qry) { + String space = cctx.name(); + String sqlQry = qry.getSql(); + Connection c = connectionForSpace(space); PreparedStatement stmt; @@ -760,7 +770,7 @@ public class IgniteH2Indexing implements GridQueryIndexing { Collection<GridQueryFieldMetadata> meta; try { - twoStepQry = GridSqlQuerySplitter.split((JdbcPreparedStatement)stmt, params); + twoStepQry = GridSqlQuerySplitter.split((JdbcPreparedStatement)stmt, qry.getArgs()); meta = meta(stmt.getMetaData()); } @@ -774,7 +784,9 @@ public class IgniteH2Indexing implements GridQueryIndexing { if (log.isDebugEnabled()) log.debug("Parsed query: `" + sqlQry + "` into two step query: " + twoStepQry); - QueryCursorImpl<List<?>> cursor = (QueryCursorImpl<List<?>>)queryTwoStep(space, twoStepQry); + twoStepQry.pageSize(qry.getPageSize()); + + QueryCursorImpl<List<?>> cursor = (QueryCursorImpl<List<?>>)queryTwoStep(cctx, twoStepQry); cursor.fieldsMeta(meta); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f16eadee/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java index 4c1dde7..9195153 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java @@ -26,6 +26,7 @@ import org.apache.ignite.internal.managers.communication.*; import org.apache.ignite.internal.managers.eventstorage.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.query.*; +import org.apache.ignite.internal.processors.query.*; import org.apache.ignite.internal.processors.query.h2.*; import org.apache.ignite.internal.processors.query.h2.sql.*; import org.apache.ignite.internal.processors.query.h2.twostep.messages.*; @@ -237,25 +238,27 @@ public class GridReduceQueryExecutor implements GridMessageListener { } /** - * @param space Space name. + * @param cctx Cache context. * @param qry Query. * @return Cursor. */ - public QueryCursor<List<?>> query(String space, GridCacheTwoStepQuery qry) { + public QueryCursor<List<?>> query(GridCacheContext<?,?> cctx, GridCacheTwoStepQuery qry) { long qryReqId = reqIdGen.incrementAndGet(); QueryRun r = new QueryRun(); - r.pageSize = 1000; // TODO configure correctly page size + r.pageSize = qry.pageSize() <= 0 ? GridCacheTwoStepQuery.DFLT_PAGE_SIZE : qry.pageSize(); r.tbls = new ArrayList<>(qry.mapQueries().size()); + String space = cctx.name(); + r.conn = h2.connectionForSpace(space); // TODO Add topology version. ClusterGroup dataNodes = ctx.grid().cluster().forDataNodes(space); - if (ctx.cache().internalCache(space).context().isReplicated()) { + if (cctx.isReplicated()) { assert dataNodes.node(ctx.localNodeId()) == null : "We must be on a client node."; dataNodes = dataNodes.forRandom(); // Select random data node to run query on a replicated data. @@ -307,7 +310,7 @@ public class GridReduceQueryExecutor implements GridMessageListener { // dropTable(r.conn, tbl.getName()); TODO } - return new QueryCursorImpl<>(new Iter(res)); + return new QueryCursorImpl<>(new GridQueryPortableFieldsIterator(new Iter(res), cctx, cctx.keepPortable())); } catch (IgniteCheckedException | InterruptedException | RuntimeException e) { U.closeQuiet(r.conn);