ignite-sql-old - fixes

Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/a50e0524
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/a50e0524
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/a50e0524

Branch: refs/heads/ignite-sql-old
Commit: a50e0524421dfa267488b8fa3b430c0e59f33ba8
Parents: 42e5786
Author: S.Vladykin <svlady...@gridgain.com>
Authored: Sun Feb 15 05:08:02 2015 +0300
Committer: S.Vladykin <svlady...@gridgain.com>
Committed: Sun Feb 15 05:08:02 2015 +0300

----------------------------------------------------------------------
 .../processors/cache/IgniteCacheProxy.java      | 111 +++++++++++++------
 1 file changed, 76 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a50e0524/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
index b4cfc09..1361b8b 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
@@ -264,7 +264,7 @@ public class IgniteCacheProxy<K, V> extends 
AsyncSupportAdapter<IgniteCache<K, V
      * @return Cursor.
      */
     @SuppressWarnings("unchecked")
-    private QueryCursor<Entry<K,V>> query(Query filter, @Nullable ClusterGroup 
grp) {
+    private QueryCursor<Entry<K,V>> doQuery(Query filter, @Nullable 
ClusterGroup grp) {
         final CacheQuery<Map.Entry<K,V>> qry;
         final CacheQueryFuture<Map.Entry<K,V>> fut;
 
@@ -296,30 +296,27 @@ public class IgniteCacheProxy<K, V> extends 
AsyncSupportAdapter<IgniteCache<K, V
 
             fut = qry.execute(((SpiQuery)filter).getArgs());
         }
-        else
-            throw new IgniteException("Unsupported query predicate: " + 
filter);
+        else if (filter instanceof SqlQuery) {
+            SqlQuery q = (SqlQuery)filter;
 
-        return new QueryCursorImpl<>(new 
GridCloseableIteratorAdapter<Entry<K,V>>() {
-            /** */
-            Map.Entry<K,V> cur;
+            qry = 
((GridCacheQueriesEx)delegate.queries()).createSqlQuery(q.getType(), 
q.getSql());
 
-            @Override protected Entry<K,V> onNext() throws 
IgniteCheckedException {
-                if (!onHasNext())
-                    throw new NoSuchElementException();
+            if (grp != null)
+                qry.projection(grp);
 
-                Map.Entry<K,V> e = cur;
+            qry.pageSize(q.getPageSize());
 
-                cur = null;
+            qry.enableDedup(false);
+            qry.includeBackups(false);
 
-                return new CacheEntryImpl<>(e.getKey(), e.getValue());
-            }
-
-            @Override protected boolean onHasNext() throws 
IgniteCheckedException {
-                return cur != null || (cur = fut.next()) != null;
-            }
+            fut = qry.execute(q.getArgs());
+        }
+        else
+            throw new IgniteException("Unsupported query predicate: " + 
filter);
 
-            @Override protected void onClose() throws IgniteCheckedException {
-                fut.cancel();
+        return new QueryCursorImpl<>(new 
ClIter<Map.Entry<K,V>,Cache.Entry<K,V>>(fut) {
+            @Override protected Cache.Entry<K,V> convert(Map.Entry<K,V> e) {
+                return new CacheEntryImpl<>(e.getKey(), e.getValue());
             }
         });
     }
@@ -387,7 +384,7 @@ public class IgniteCacheProxy<K, V> extends 
AsyncSupportAdapter<IgniteCache<K, V
      * @return Local node cluster group.
      */
     private ClusterGroup projection(boolean local) {
-        return local || ctx.isLocal() || ctx.isReplicated() ? 
ctx.kernalContext().grid().forLocal() : null;
+        return local ? ctx.kernalContext().grid().forLocal() : null;
     }
 
     /** {@inheritDoc} */
@@ -400,13 +397,10 @@ public class IgniteCacheProxy<K, V> extends 
AsyncSupportAdapter<IgniteCache<K, V
         try {
             validate(qry);
 
-            if (qry instanceof SqlQuery) {
-                return null; // TODO
-            }
-            else if (qry instanceof ContinuousQuery)
+            if (qry instanceof ContinuousQuery)
                 return queryContinuous((ContinuousQuery<K, V>)qry, false);
 
-            return query(qry, projection(false));
+            return doQuery(qry, projection(false));
         }
         catch (Exception e) {
             if (e instanceof CacheException)
@@ -428,7 +422,17 @@ public class IgniteCacheProxy<K, V> extends 
AsyncSupportAdapter<IgniteCache<K, V
         try {
             validate(qry);
 
-            return null; // TODO
+            CacheQuery<List<?>> q = 
((GridCacheQueriesEx<K,V>)delegate.queries()).createSqlFieldsQuery(qry.getSql(),
 false);
+
+            q.pageSize(qry.getPageSize());
+
+            CacheQueryFuture<List<?>> fut = q.execute(qry.getArgs());
+
+            return new QueryCursorImpl<>(new ClIter<List<?>, List<?>>(fut) {
+                @Override protected List<?> convert(List<?> row) {
+                    return row;
+                }
+            });
         }
         catch (Exception e) {
             if (e instanceof CacheException)
@@ -446,9 +450,8 @@ public class IgniteCacheProxy<K, V> extends 
AsyncSupportAdapter<IgniteCache<K, V
      * @return Cursor.
      */
     private QueryCursor<Entry<K,V>> doLocalQuery(SqlQuery p) {
-        return null; // TODO
-//            new 
QueryCursorImpl<>(ctx.kernalContext().query().<K,V>queryLocal(
-//            ctx.name(), p.getType(), p.getSql(), p.getArgs()));
+        return new 
QueryCursorImpl<>(ctx.kernalContext().query().<K,V>queryLocal(
+            ctx.name(), p.getType(), p.getSql(), p.getArgs()));
     }
 
     /**
@@ -456,9 +459,8 @@ public class IgniteCacheProxy<K, V> extends 
AsyncSupportAdapter<IgniteCache<K, V
      * @return Cursor.
      */
     private QueryCursor<List<?>> doLocalFieldsQuery(SqlFieldsQuery q) {
-        return null; // TODO
-//            new 
QueryCursorImpl<>(ctx.kernalContext().query().queryLocalFields(
-//            ctx.name(), q.getSql(), q.getArgs()));
+        return new 
QueryCursorImpl<>(ctx.kernalContext().query().queryLocalFields(
+            ctx.name(), q.getSql(), q.getArgs()));
     }
 
     /**
@@ -484,10 +486,11 @@ public class IgniteCacheProxy<K, V> extends 
AsyncSupportAdapter<IgniteCache<K, V
 
             if (qry instanceof SqlQuery)
                 return doLocalQuery((SqlQuery)qry);
-            else if (qry instanceof ContinuousQuery)
-                return queryContinuous((ContinuousQuery<K, V>)qry, true);
 
-            return query(qry, projection(true));
+            if (qry instanceof ContinuousQuery)
+                return queryContinuous((ContinuousQuery<K,V>)qry, true);
+
+            return doQuery(qry, projection(true));
         }
         catch (Exception e) {
             if (e instanceof CacheException)
@@ -1380,4 +1383,42 @@ public class IgniteCacheProxy<K, V> extends 
AsyncSupportAdapter<IgniteCache<K, V
     @Override public String toString() {
         return S.toString(IgniteCacheProxy.class, this);
     }
+
+    /**
+     * Closeable iterator.
+     */
+    private static abstract class ClIter<X, Y> extends 
GridCloseableIteratorAdapter<Y> {
+        /** */
+        private X cur;
+
+        private CacheQueryFuture<X> fut;
+
+        /**
+         * @param fut Future.
+         */
+        protected ClIter(CacheQueryFuture<X> fut) {
+            this.fut = fut;
+        }
+
+        @Override protected Y onNext() throws IgniteCheckedException {
+            if (!onHasNext())
+                throw new NoSuchElementException();
+
+            X e = cur;
+
+            cur = null;
+
+            return convert(e);
+        }
+
+        protected abstract Y convert(X x);
+
+        @Override protected boolean onHasNext() throws IgniteCheckedException {
+            return cur != null || (cur = fut.next()) != null;
+        }
+
+        @Override protected void onClose() throws IgniteCheckedException {
+            fut.cancel();
+        }
+    }
 }

Reply via email to