ignite-sql-tests - jdbc new api

Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/260bebab
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/260bebab
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/260bebab

Branch: refs/heads/ignite-sql-tests
Commit: 260bebabd23598526d1711f9448a3936d142c4c0
Parents: 4476efe
Author: S.Vladykin <svlady...@gridgain.com>
Authored: Tue Mar 17 02:53:47 2015 +0300
Committer: S.Vladykin <svlady...@gridgain.com>
Committed: Tue Mar 17 02:53:47 2015 +0300

----------------------------------------------------------------------
 .../query/jdbc/GridCacheQueryJdbcTask.java      | 240 +++++++++++--------
 1 file changed, 136 insertions(+), 104 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/260bebab/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/jdbc/GridCacheQueryJdbcTask.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/jdbc/GridCacheQueryJdbcTask.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/jdbc/GridCacheQueryJdbcTask.java
index 02cd295..b53a9e1 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/jdbc/GridCacheQueryJdbcTask.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/jdbc/GridCacheQueryJdbcTask.java
@@ -18,13 +18,10 @@
 package org.apache.ignite.internal.processors.cache.query.jdbc;
 
 import org.apache.ignite.*;
+import org.apache.ignite.cache.query.*;
 import org.apache.ignite.cluster.*;
 import org.apache.ignite.compute.*;
-import org.apache.ignite.internal.*;
-import org.apache.ignite.internal.processors.cache.*;
-import org.apache.ignite.internal.processors.cache.query.*;
 import org.apache.ignite.internal.processors.query.*;
-import org.apache.ignite.internal.util.lang.*;
 import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.marshaller.*;
@@ -38,7 +35,6 @@ import java.util.*;
 import java.util.Date;
 import java.util.concurrent.*;
 
-import static org.apache.ignite.cache.CacheMode.*;
 import static org.apache.ignite.compute.ComputeJobResultPolicy.*;
 
 /**
@@ -52,7 +48,7 @@ public class GridCacheQueryJdbcTask extends 
ComputeTaskAdapter<byte[], byte[]> {
     private static final Marshaller MARSHALLER = new JdkMarshaller();
 
     /** How long to store future (10 minutes). */
-    private static final int RMV_DELAY = 10 * 60;
+    private static final int RMV_DELAY = 10 * 60 * 1000;
 
     /** Scheduler. */
     private static final ScheduledExecutorService SCHEDULER = 
Executors.newScheduledThreadPool(1);
@@ -168,148 +164,148 @@ public class GridCacheQueryJdbcTask extends 
ComputeTaskAdapter<byte[], byte[]> {
 
         /** {@inheritDoc} */
         @Override public Object execute() {
-            try {
-                String cacheName = argument("cache");
-                String sql = argument("sql");
-                Long timeout = argument("timeout");
-                List<Object> args = argument("args");
-                UUID futId = argument("futId");
-                Integer pageSize = argument("pageSize");
-                Integer maxRows = argument("maxRows");
+            String cacheName = argument("cache");
+            String sql = argument("sql");
+            Long timeout = argument("timeout");
+            List<Object> args = argument("args");
+            UUID futId = argument("futId");
+            final int pageSize = argument("pageSize");
+            final int maxRows = argument("maxRows");
 
-                assert pageSize != null;
-                assert maxRows != null;
+            assert maxRows >= 0 : maxRows;
 
-                GridTuple4<CacheQueryFuture<List<?>>, Integer, Boolean, 
Collection<String>> t = null;
+            Cursor c = null;
 
-                Collection<String> tbls = null;
-                Collection<String> cols;
-                Collection<String> types = null;
+            Collection<String> tbls = null;
+            Collection<String> cols = null;
+            Collection<String> types = null;
 
-                if (first) {
-                    assert sql != null;
-                    assert timeout != null;
-                    assert args != null;
-                    assert futId == null;
+            if (first) {
+                assert sql != null;
+                assert timeout != null;
+                assert args != null;
+                assert futId == null;
 
-                    GridCache<?, ?> cache = ((IgniteEx) 
ignite).cachex(cacheName);
+                IgniteCache<?, ?> cache = ignite.jcache(cacheName);
 
-                    CacheQuery<List<?>> qry =
-                        ((GridCacheQueriesEx<?, 
?>)cache.queries()).createSqlFieldsQuery(sql, true);
+                SqlFieldsQuery qry = new 
SqlFieldsQuery(sql).setArgs(args.toArray());
 
-                    qry.pageSize(pageSize);
-                    qry.timeout(timeout);
+                qry.setPageSize(pageSize);
 
-                    // Query local and replicated caches only locally.
-                    if (cache.configuration().getCacheMode() != PARTITIONED)
-                        qry = qry.projection(ignite.cluster().forLocal());
+                QueryCursor<List<?>> cursor = cache.queryFields(qry);
 
-                    CacheQueryFuture<List<?>> fut = 
qry.execute(args.toArray());
+                Collection<GridQueryFieldMetadata> meta = null; // TODO
 
-                    Collection<GridQueryFieldMetadata> meta = 
((GridCacheQueryMetadataAware)fut).metadata().get();
+                tbls = new ArrayList<>(meta.size());
+                cols = new ArrayList<>(meta.size());
+                types = new ArrayList<>(meta.size());
 
-                    if (meta == null) {
-                        // Try to extract initial SQL exception.
-                        try {
-                            fut.get();
-                        }
-                        catch (IgniteCheckedException e) {
-                            if (e.hasCause(SQLException.class))
-                                throw new 
GridInternalException(e.getCause(SQLException.class).getMessage(), e);
-                        }
+                for (GridQueryFieldMetadata desc : meta) {
+                    tbls.add(desc.typeName());
+                    cols.add(desc.fieldName().toUpperCase());
+                    types.add(desc.fieldTypeName());
+                }
 
-                        throw new GridInternalException("Query failed on all 
nodes. Probably you are requesting " +
-                            "nonexistent table (check database metadata) or 
you are trying to join data that is " +
-                            "stored in non-collocated mode.");
-                    }
+                futId = UUID.randomUUID();
 
-                    tbls = new ArrayList<>(meta.size());
-                    cols = new ArrayList<>(meta.size());
-                    types = new ArrayList<>(meta.size());
+                c = new Cursor(cursor, cursor.iterator(), 0, 
U.currentTimeMillis());
+            }
 
-                    for (GridQueryFieldMetadata desc : meta) {
-                        tbls.add(desc.typeName());
-                        cols.add(desc.fieldName().toUpperCase());
-                        types.add(desc.fieldTypeName());
-                    }
+            assert futId != null;
 
-                    futId = UUID.randomUUID();
+            ConcurrentMap<UUID,Cursor> m = ignite.cluster().nodeLocalMap();
 
-                    ignite.cluster().nodeLocalMap().put(futId, t = F.t(fut, 0, 
false, cols));
+            if (c == null)
+                c = m.get(futId);
 
-                    scheduleRemoval(futId);
-                }
+            if (c == null)
+                throw new IgniteException("Cursor was removed due to long 
inactivity.");
 
-                assert futId != null;
+            Collection<List<?>> rows = new ArrayList<>();
 
-                if (t == null)
-                    t = ignite.cluster().<UUID, 
GridTuple4<CacheQueryFuture<List<?>>, Integer, Boolean,
-                        Collection<String>>>nodeLocalMap().get(futId);
+            int totalCnt = c.totalCnt;
 
-                assert t != null;
+            boolean finished = true;
 
-                cols = t.get4();
+            for (List<?> row : c) {
+                List<Object> row0 = new ArrayList<>(row.size());
 
-                Collection<List<Object>> fields = new LinkedList<>();
+                for (Object val : row)
+                    row0.add(sqlType(val) ? val : val.toString());
 
-                CacheQueryFuture<List<?>> fut = t.get1();
+                rows.add(row0);
 
-                int pageCnt = 0;
-                int totalCnt = t.get2();
+                if (++totalCnt == maxRows) // If maxRows is 0 then unlimited
+                    break;
 
-                List<?> next;
+                if (rows.size() == pageSize) {
+                    finished = false;
+
+                    break;
+                }
+            }
 
-                while ((next = fut.next()) != null && pageCnt++ < pageSize && 
(maxRows == 0 || totalCnt++ < maxRows)) {
-                    fields.add(F.transformList(next, new C1<Object, Object>() {
-                        @Override public Object apply(Object val) {
-                            if (val != null && !sqlType(val))
-                                val = val.toString();
+            if (!finished) {
+                if (first) {
+                    m.put(futId, c);
 
-                            return val;
-                        }
-                    }));
+                    scheduleRemoval(futId, RMV_DELAY);
                 }
+                else if (!m.replace(futId, c, new Cursor(c.cursor, c.iter, 
totalCnt, U.currentTimeMillis())))
+                    assert !m.containsKey(futId) : "Concurrent cursor 
modification.";
+            }
+            else if (first) // No need to remove.
+                c.cursor.close();
+            else
+                remove(futId, c);
 
-                boolean finished = next == null || totalCnt == maxRows;
+            return first ? F.asList(ignite.cluster().localNode().id(), futId, 
tbls, cols, types, rows, finished) :
+                F.asList(rows, finished);
+        }
 
-                if (!finished)
-                    ignite.cluster().nodeLocalMap().put(futId, F.t(fut, 
totalCnt, true, cols));
-                else
-                    ignite.cluster().nodeLocalMap().remove(futId);
+        /**
+         * @param futId Cursor ID.
+         * @param c Cursor.
+         * @return {@code true} If succeeded.
+         */
+        private boolean remove(UUID futId, Cursor c) {
+            if (ignite.cluster().<UUID,Cursor>nodeLocalMap().remove(futId, c)) 
{
+                c.cursor.close();
 
-                return first ? F.asList(ignite.cluster().localNode().id(), 
futId, tbls, cols, types, fields, finished) :
-                    F.asList(fields, finished);
-            }
-            catch (IgniteCheckedException e) {
-                throw U.convertException(e);
+                return true;
             }
+
+            return false;
         }
 
         /**
          * Schedules removal of stored future.
          *
          * @param id Future ID.
+         * @param delay Delay in milliseconds.
          */
-        private void scheduleRemoval(final UUID id) {
+        private void scheduleRemoval(final UUID id, long delay) {
             SCHEDULER.schedule(new CAX() {
                 @Override public void applyx() {
-                    GridTuple3<CacheQueryFuture<List<?>>, Integer, Boolean> t =
-                        ignite.cluster().<UUID, 
GridTuple3<CacheQueryFuture<List<?>>, Integer, Boolean>>nodeLocalMap().get(id);
+                    for (;;) {
+                        Cursor c = 
ignite.cluster().<UUID,Cursor>nodeLocalMap().get(id);
+
+                        if (c == null)
+                            break;
 
-                    if (t != null) {
-                        // If future was accessed since last scheduling,
-                        // set access flag to false and reschedule.
-                        if (t.get3()) {
-                            t.set3(false);
+                        // If the cursor was accessed since last scheduling 
then reschedule.
+                        long untouchedTime = U.currentTimeMillis() - 
c.lastAccessTime;
 
-                            scheduleRemoval(id);
+                        if (untouchedTime < RMV_DELAY) {
+                            scheduleRemoval(id, RMV_DELAY - untouchedTime);
+
+                            break;
                         }
-                        // Remove stored future otherwise.
-                        else
-                            ignite.cluster().nodeLocalMap().remove(id);
+                        else if (remove(id, c))
+                            break;
                     }
                 }
-            }, RMV_DELAY, TimeUnit.SECONDS);
+            }, delay, TimeUnit.MILLISECONDS);
         }
 
         /**
@@ -318,8 +314,9 @@ public class GridCacheQueryJdbcTask extends 
ComputeTaskAdapter<byte[], byte[]> {
          * @param obj Object.
          * @return Whether type of the object is SQL-complaint.
          */
-        private boolean sqlType(Object obj) {
-            return obj instanceof BigDecimal ||
+        private static boolean sqlType(Object obj) {
+            return obj == null ||
+                obj instanceof BigDecimal ||
                 obj instanceof Boolean ||
                 obj instanceof Byte ||
                 obj instanceof byte[] ||
@@ -343,4 +340,39 @@ public class GridCacheQueryJdbcTask extends 
ComputeTaskAdapter<byte[], byte[]> {
             return (T)args.get(key);
         }
     }
+
+    /**
+     * Cursor.
+     */
+    private static final class Cursor implements Iterable<List<?>> {
+        /** */
+        final QueryCursor<List<?>> cursor;
+
+        /** */
+        final Iterator<List<?>> iter;
+
+        /** */
+        final int totalCnt;
+
+        /** */
+        final long lastAccessTime;
+
+        /**
+         * @param cursor Cursor.
+         * @param iter Iterator.
+         * @param totalCnt Total row count already fetched.
+         * @param lastAccessTime Last cursor access timestamp.
+         */
+        private Cursor(QueryCursor<List<?>> cursor, Iterator<List<?>> iter, 
int totalCnt, long lastAccessTime) {
+            this.cursor = cursor;
+            this.iter = iter;
+            this.totalCnt = totalCnt;
+            this.lastAccessTime = lastAccessTime;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Iterator<List<?>> iterator() {
+            return iter;
+        }
+    }
 }

Reply via email to