ignite-sql-old - fixes
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/a50e0524 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/a50e0524 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/a50e0524 Branch: refs/heads/sprint-1 Commit: a50e0524421dfa267488b8fa3b430c0e59f33ba8 Parents: 42e5786 Author: S.Vladykin <svlady...@gridgain.com> Authored: Sun Feb 15 05:08:02 2015 +0300 Committer: S.Vladykin <svlady...@gridgain.com> Committed: Sun Feb 15 05:08:02 2015 +0300 ---------------------------------------------------------------------- .../processors/cache/IgniteCacheProxy.java | 111 +++++++++++++------ 1 file changed, 76 insertions(+), 35 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a50e0524/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java index b4cfc09..1361b8b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java @@ -264,7 +264,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V * @return Cursor. */ @SuppressWarnings("unchecked") - private QueryCursor<Entry<K,V>> query(Query filter, @Nullable ClusterGroup grp) { + private QueryCursor<Entry<K,V>> doQuery(Query filter, @Nullable ClusterGroup grp) { final CacheQuery<Map.Entry<K,V>> qry; final CacheQueryFuture<Map.Entry<K,V>> fut; @@ -296,30 +296,27 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V fut = qry.execute(((SpiQuery)filter).getArgs()); } - else - throw new IgniteException("Unsupported query predicate: " + filter); + else if (filter instanceof SqlQuery) { + SqlQuery q = (SqlQuery)filter; - return new QueryCursorImpl<>(new GridCloseableIteratorAdapter<Entry<K,V>>() { - /** */ - Map.Entry<K,V> cur; + qry = ((GridCacheQueriesEx)delegate.queries()).createSqlQuery(q.getType(), q.getSql()); - @Override protected Entry<K,V> onNext() throws IgniteCheckedException { - if (!onHasNext()) - throw new NoSuchElementException(); + if (grp != null) + qry.projection(grp); - Map.Entry<K,V> e = cur; + qry.pageSize(q.getPageSize()); - cur = null; + qry.enableDedup(false); + qry.includeBackups(false); - return new CacheEntryImpl<>(e.getKey(), e.getValue()); - } - - @Override protected boolean onHasNext() throws IgniteCheckedException { - return cur != null || (cur = fut.next()) != null; - } + fut = qry.execute(q.getArgs()); + } + else + throw new IgniteException("Unsupported query predicate: " + filter); - @Override protected void onClose() throws IgniteCheckedException { - fut.cancel(); + return new QueryCursorImpl<>(new ClIter<Map.Entry<K,V>,Cache.Entry<K,V>>(fut) { + @Override protected Cache.Entry<K,V> convert(Map.Entry<K,V> e) { + return new CacheEntryImpl<>(e.getKey(), e.getValue()); } }); } @@ -387,7 +384,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V * @return Local node cluster group. */ private ClusterGroup projection(boolean local) { - return local || ctx.isLocal() || ctx.isReplicated() ? ctx.kernalContext().grid().forLocal() : null; + return local ? ctx.kernalContext().grid().forLocal() : null; } /** {@inheritDoc} */ @@ -400,13 +397,10 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V try { validate(qry); - if (qry instanceof SqlQuery) { - return null; // TODO - } - else if (qry instanceof ContinuousQuery) + if (qry instanceof ContinuousQuery) return queryContinuous((ContinuousQuery<K, V>)qry, false); - return query(qry, projection(false)); + return doQuery(qry, projection(false)); } catch (Exception e) { if (e instanceof CacheException) @@ -428,7 +422,17 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V try { validate(qry); - return null; // TODO + CacheQuery<List<?>> q = ((GridCacheQueriesEx<K,V>)delegate.queries()).createSqlFieldsQuery(qry.getSql(), false); + + q.pageSize(qry.getPageSize()); + + CacheQueryFuture<List<?>> fut = q.execute(qry.getArgs()); + + return new QueryCursorImpl<>(new ClIter<List<?>, List<?>>(fut) { + @Override protected List<?> convert(List<?> row) { + return row; + } + }); } catch (Exception e) { if (e instanceof CacheException) @@ -446,9 +450,8 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V * @return Cursor. */ private QueryCursor<Entry<K,V>> doLocalQuery(SqlQuery p) { - return null; // TODO -// new QueryCursorImpl<>(ctx.kernalContext().query().<K,V>queryLocal( -// ctx.name(), p.getType(), p.getSql(), p.getArgs())); + return new QueryCursorImpl<>(ctx.kernalContext().query().<K,V>queryLocal( + ctx.name(), p.getType(), p.getSql(), p.getArgs())); } /** @@ -456,9 +459,8 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V * @return Cursor. */ private QueryCursor<List<?>> doLocalFieldsQuery(SqlFieldsQuery q) { - return null; // TODO -// new QueryCursorImpl<>(ctx.kernalContext().query().queryLocalFields( -// ctx.name(), q.getSql(), q.getArgs())); + return new QueryCursorImpl<>(ctx.kernalContext().query().queryLocalFields( + ctx.name(), q.getSql(), q.getArgs())); } /** @@ -484,10 +486,11 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V if (qry instanceof SqlQuery) return doLocalQuery((SqlQuery)qry); - else if (qry instanceof ContinuousQuery) - return queryContinuous((ContinuousQuery<K, V>)qry, true); - return query(qry, projection(true)); + if (qry instanceof ContinuousQuery) + return queryContinuous((ContinuousQuery<K,V>)qry, true); + + return doQuery(qry, projection(true)); } catch (Exception e) { if (e instanceof CacheException) @@ -1380,4 +1383,42 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V @Override public String toString() { return S.toString(IgniteCacheProxy.class, this); } + + /** + * Closeable iterator. + */ + private static abstract class ClIter<X, Y> extends GridCloseableIteratorAdapter<Y> { + /** */ + private X cur; + + private CacheQueryFuture<X> fut; + + /** + * @param fut Future. + */ + protected ClIter(CacheQueryFuture<X> fut) { + this.fut = fut; + } + + @Override protected Y onNext() throws IgniteCheckedException { + if (!onHasNext()) + throw new NoSuchElementException(); + + X e = cur; + + cur = null; + + return convert(e); + } + + protected abstract Y convert(X x); + + @Override protected boolean onHasNext() throws IgniteCheckedException { + return cur != null || (cur = fut.next()) != null; + } + + @Override protected void onClose() throws IgniteCheckedException { + fut.cancel(); + } + } }