ignite-sql-tests - jdbc new api
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/260bebab Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/260bebab Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/260bebab Branch: refs/heads/ignite-sql-tests Commit: 260bebabd23598526d1711f9448a3936d142c4c0 Parents: 4476efe Author: S.Vladykin <svlady...@gridgain.com> Authored: Tue Mar 17 02:53:47 2015 +0300 Committer: S.Vladykin <svlady...@gridgain.com> Committed: Tue Mar 17 02:53:47 2015 +0300 ---------------------------------------------------------------------- .../query/jdbc/GridCacheQueryJdbcTask.java | 240 +++++++++++-------- 1 file changed, 136 insertions(+), 104 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/260bebab/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/jdbc/GridCacheQueryJdbcTask.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/jdbc/GridCacheQueryJdbcTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/jdbc/GridCacheQueryJdbcTask.java index 02cd295..b53a9e1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/jdbc/GridCacheQueryJdbcTask.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/jdbc/GridCacheQueryJdbcTask.java @@ -18,13 +18,10 @@ package org.apache.ignite.internal.processors.cache.query.jdbc; import org.apache.ignite.*; +import org.apache.ignite.cache.query.*; import org.apache.ignite.cluster.*; import org.apache.ignite.compute.*; -import org.apache.ignite.internal.*; -import org.apache.ignite.internal.processors.cache.*; -import org.apache.ignite.internal.processors.cache.query.*; import org.apache.ignite.internal.processors.query.*; -import org.apache.ignite.internal.util.lang.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.marshaller.*; @@ -38,7 +35,6 @@ import java.util.*; import java.util.Date; import java.util.concurrent.*; -import static org.apache.ignite.cache.CacheMode.*; import static org.apache.ignite.compute.ComputeJobResultPolicy.*; /** @@ -52,7 +48,7 @@ public class GridCacheQueryJdbcTask extends ComputeTaskAdapter<byte[], byte[]> { private static final Marshaller MARSHALLER = new JdkMarshaller(); /** How long to store future (10 minutes). */ - private static final int RMV_DELAY = 10 * 60; + private static final int RMV_DELAY = 10 * 60 * 1000; /** Scheduler. */ private static final ScheduledExecutorService SCHEDULER = Executors.newScheduledThreadPool(1); @@ -168,148 +164,148 @@ public class GridCacheQueryJdbcTask extends ComputeTaskAdapter<byte[], byte[]> { /** {@inheritDoc} */ @Override public Object execute() { - try { - String cacheName = argument("cache"); - String sql = argument("sql"); - Long timeout = argument("timeout"); - List<Object> args = argument("args"); - UUID futId = argument("futId"); - Integer pageSize = argument("pageSize"); - Integer maxRows = argument("maxRows"); + String cacheName = argument("cache"); + String sql = argument("sql"); + Long timeout = argument("timeout"); + List<Object> args = argument("args"); + UUID futId = argument("futId"); + final int pageSize = argument("pageSize"); + final int maxRows = argument("maxRows"); - assert pageSize != null; - assert maxRows != null; + assert maxRows >= 0 : maxRows; - GridTuple4<CacheQueryFuture<List<?>>, Integer, Boolean, Collection<String>> t = null; + Cursor c = null; - Collection<String> tbls = null; - Collection<String> cols; - Collection<String> types = null; + Collection<String> tbls = null; + Collection<String> cols = null; + Collection<String> types = null; - if (first) { - assert sql != null; - assert timeout != null; - assert args != null; - assert futId == null; + if (first) { + assert sql != null; + assert timeout != null; + assert args != null; + assert futId == null; - GridCache<?, ?> cache = ((IgniteEx) ignite).cachex(cacheName); + IgniteCache<?, ?> cache = ignite.jcache(cacheName); - CacheQuery<List<?>> qry = - ((GridCacheQueriesEx<?, ?>)cache.queries()).createSqlFieldsQuery(sql, true); + SqlFieldsQuery qry = new SqlFieldsQuery(sql).setArgs(args.toArray()); - qry.pageSize(pageSize); - qry.timeout(timeout); + qry.setPageSize(pageSize); - // Query local and replicated caches only locally. - if (cache.configuration().getCacheMode() != PARTITIONED) - qry = qry.projection(ignite.cluster().forLocal()); + QueryCursor<List<?>> cursor = cache.queryFields(qry); - CacheQueryFuture<List<?>> fut = qry.execute(args.toArray()); + Collection<GridQueryFieldMetadata> meta = null; // TODO - Collection<GridQueryFieldMetadata> meta = ((GridCacheQueryMetadataAware)fut).metadata().get(); + tbls = new ArrayList<>(meta.size()); + cols = new ArrayList<>(meta.size()); + types = new ArrayList<>(meta.size()); - if (meta == null) { - // Try to extract initial SQL exception. - try { - fut.get(); - } - catch (IgniteCheckedException e) { - if (e.hasCause(SQLException.class)) - throw new GridInternalException(e.getCause(SQLException.class).getMessage(), e); - } + for (GridQueryFieldMetadata desc : meta) { + tbls.add(desc.typeName()); + cols.add(desc.fieldName().toUpperCase()); + types.add(desc.fieldTypeName()); + } - throw new GridInternalException("Query failed on all nodes. Probably you are requesting " + - "nonexistent table (check database metadata) or you are trying to join data that is " + - "stored in non-collocated mode."); - } + futId = UUID.randomUUID(); - tbls = new ArrayList<>(meta.size()); - cols = new ArrayList<>(meta.size()); - types = new ArrayList<>(meta.size()); + c = new Cursor(cursor, cursor.iterator(), 0, U.currentTimeMillis()); + } - for (GridQueryFieldMetadata desc : meta) { - tbls.add(desc.typeName()); - cols.add(desc.fieldName().toUpperCase()); - types.add(desc.fieldTypeName()); - } + assert futId != null; - futId = UUID.randomUUID(); + ConcurrentMap<UUID,Cursor> m = ignite.cluster().nodeLocalMap(); - ignite.cluster().nodeLocalMap().put(futId, t = F.t(fut, 0, false, cols)); + if (c == null) + c = m.get(futId); - scheduleRemoval(futId); - } + if (c == null) + throw new IgniteException("Cursor was removed due to long inactivity."); - assert futId != null; + Collection<List<?>> rows = new ArrayList<>(); - if (t == null) - t = ignite.cluster().<UUID, GridTuple4<CacheQueryFuture<List<?>>, Integer, Boolean, - Collection<String>>>nodeLocalMap().get(futId); + int totalCnt = c.totalCnt; - assert t != null; + boolean finished = true; - cols = t.get4(); + for (List<?> row : c) { + List<Object> row0 = new ArrayList<>(row.size()); - Collection<List<Object>> fields = new LinkedList<>(); + for (Object val : row) + row0.add(sqlType(val) ? val : val.toString()); - CacheQueryFuture<List<?>> fut = t.get1(); + rows.add(row0); - int pageCnt = 0; - int totalCnt = t.get2(); + if (++totalCnt == maxRows) // If maxRows is 0 then unlimited + break; - List<?> next; + if (rows.size() == pageSize) { + finished = false; + + break; + } + } - while ((next = fut.next()) != null && pageCnt++ < pageSize && (maxRows == 0 || totalCnt++ < maxRows)) { - fields.add(F.transformList(next, new C1<Object, Object>() { - @Override public Object apply(Object val) { - if (val != null && !sqlType(val)) - val = val.toString(); + if (!finished) { + if (first) { + m.put(futId, c); - return val; - } - })); + scheduleRemoval(futId, RMV_DELAY); } + else if (!m.replace(futId, c, new Cursor(c.cursor, c.iter, totalCnt, U.currentTimeMillis()))) + assert !m.containsKey(futId) : "Concurrent cursor modification."; + } + else if (first) // No need to remove. + c.cursor.close(); + else + remove(futId, c); - boolean finished = next == null || totalCnt == maxRows; + return first ? F.asList(ignite.cluster().localNode().id(), futId, tbls, cols, types, rows, finished) : + F.asList(rows, finished); + } - if (!finished) - ignite.cluster().nodeLocalMap().put(futId, F.t(fut, totalCnt, true, cols)); - else - ignite.cluster().nodeLocalMap().remove(futId); + /** + * @param futId Cursor ID. + * @param c Cursor. + * @return {@code true} If succeeded. + */ + private boolean remove(UUID futId, Cursor c) { + if (ignite.cluster().<UUID,Cursor>nodeLocalMap().remove(futId, c)) { + c.cursor.close(); - return first ? F.asList(ignite.cluster().localNode().id(), futId, tbls, cols, types, fields, finished) : - F.asList(fields, finished); - } - catch (IgniteCheckedException e) { - throw U.convertException(e); + return true; } + + return false; } /** * Schedules removal of stored future. * * @param id Future ID. + * @param delay Delay in milliseconds. */ - private void scheduleRemoval(final UUID id) { + private void scheduleRemoval(final UUID id, long delay) { SCHEDULER.schedule(new CAX() { @Override public void applyx() { - GridTuple3<CacheQueryFuture<List<?>>, Integer, Boolean> t = - ignite.cluster().<UUID, GridTuple3<CacheQueryFuture<List<?>>, Integer, Boolean>>nodeLocalMap().get(id); + for (;;) { + Cursor c = ignite.cluster().<UUID,Cursor>nodeLocalMap().get(id); + + if (c == null) + break; - if (t != null) { - // If future was accessed since last scheduling, - // set access flag to false and reschedule. - if (t.get3()) { - t.set3(false); + // If the cursor was accessed since last scheduling then reschedule. + long untouchedTime = U.currentTimeMillis() - c.lastAccessTime; - scheduleRemoval(id); + if (untouchedTime < RMV_DELAY) { + scheduleRemoval(id, RMV_DELAY - untouchedTime); + + break; } - // Remove stored future otherwise. - else - ignite.cluster().nodeLocalMap().remove(id); + else if (remove(id, c)) + break; } } - }, RMV_DELAY, TimeUnit.SECONDS); + }, delay, TimeUnit.MILLISECONDS); } /** @@ -318,8 +314,9 @@ public class GridCacheQueryJdbcTask extends ComputeTaskAdapter<byte[], byte[]> { * @param obj Object. * @return Whether type of the object is SQL-complaint. */ - private boolean sqlType(Object obj) { - return obj instanceof BigDecimal || + private static boolean sqlType(Object obj) { + return obj == null || + obj instanceof BigDecimal || obj instanceof Boolean || obj instanceof Byte || obj instanceof byte[] || @@ -343,4 +340,39 @@ public class GridCacheQueryJdbcTask extends ComputeTaskAdapter<byte[], byte[]> { return (T)args.get(key); } } + + /** + * Cursor. + */ + private static final class Cursor implements Iterable<List<?>> { + /** */ + final QueryCursor<List<?>> cursor; + + /** */ + final Iterator<List<?>> iter; + + /** */ + final int totalCnt; + + /** */ + final long lastAccessTime; + + /** + * @param cursor Cursor. + * @param iter Iterator. + * @param totalCnt Total row count already fetched. + * @param lastAccessTime Last cursor access timestamp. + */ + private Cursor(QueryCursor<List<?>> cursor, Iterator<List<?>> iter, int totalCnt, long lastAccessTime) { + this.cursor = cursor; + this.iter = iter; + this.totalCnt = totalCnt; + this.lastAccessTime = lastAccessTime; + } + + /** {@inheritDoc} */ + @Override public Iterator<List<?>> iterator() { + return iter; + } + } }