IGNITE-SQL Fixed event tests.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/a368be4b Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/a368be4b Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/a368be4b Branch: refs/heads/ignite-143 Commit: a368be4b15514306fec0128590a3147b19a5aca3 Parents: 3658127 Author: nikolay_tikhonov <ntikho...@gridgain.com> Authored: Fri Feb 13 17:12:01 2015 +0300 Committer: nikolay_tikhonov <ntikho...@gridgain.com> Committed: Fri Feb 13 17:12:01 2015 +0300 ---------------------------------------------------------------------- .../processors/query/GridQueryProcessor.java | 40 +++++- .../query/h2/twostep/GridMapQueryExecutor.java | 19 +++ .../cache/IgniteCacheAbstractQuerySelfTest.java | 121 ++----------------- 3 files changed, 67 insertions(+), 113 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a368be4b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java index bfda08f..ae8a249 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java @@ -22,6 +22,7 @@ import org.apache.ignite.cache.*; import org.apache.ignite.cache.query.*; import org.apache.ignite.cache.query.annotations.*; import org.apache.ignite.configuration.*; +import org.apache.ignite.events.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.processors.*; import org.apache.ignite.internal.processors.cache.*; @@ -34,6 +35,7 @@ import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.internal.util.worker.*; import org.apache.ignite.lang.*; +import org.apache.ignite.spi.*; import org.apache.ignite.spi.indexing.*; import org.jdk8.backport.*; import org.jetbrains.annotations.*; @@ -43,6 +45,7 @@ import java.lang.reflect.*; import java.util.*; import java.util.concurrent.*; +import static org.apache.ignite.events.EventType.*; import static org.apache.ignite.internal.IgniteComponentType.*; import static org.apache.ignite.internal.processors.query.GridQueryIndexType.*; @@ -510,6 +513,22 @@ public class GridQueryProcessor extends GridProcessorAdapter { final GridCloseableIterator<IgniteBiTuple<K,V>> i = idx.query(space, sqlQry, F.asList(params), typeDesc, idx.backupFilter()); + if (ctx.event().isRecordable(EVT_CACHE_QUERY_EXECUTED)) { + ctx.event().record(new CacheQueryExecutedEvent<>( + ctx.discovery().localNode(), + "SQL query executed.", + EVT_CACHE_QUERY_EXECUTED, + CacheQueryType.SQL, + null, + null, + sqlQry, + null, + null, + params, + null, + null)); + } + return new ClIter<Cache.Entry<K,V>>() { @Override public void close() throws Exception { i.close(); @@ -556,7 +575,26 @@ public class GridQueryProcessor extends GridProcessorAdapter { throw new IllegalStateException("Failed to execute query (grid is stopping)."); try { - return idx.queryFields(space, sql, F.asList(args), idx.backupFilter()).iterator(); + IgniteSpiCloseableIterator<List<?>> iterator = + idx.queryFields(space, sql, F.asList(args), idx.backupFilter()).iterator(); + + if (ctx.event().isRecordable(EVT_CACHE_QUERY_EXECUTED)) { + ctx.event().record(new CacheQueryExecutedEvent<>( + ctx.discovery().localNode(), + "SQL query executed.", + EVT_CACHE_QUERY_EXECUTED, + CacheQueryType.SQL, + null, + null, + sql, + null, + null, + args, + null, + null)); + } + + return iterator; } catch (IgniteCheckedException e) { throw new IgniteException(e); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a368be4b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java index ac5761b..3bb5320 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.query.h2.twostep; import org.apache.ignite.*; import org.apache.ignite.cluster.*; +import org.apache.ignite.events.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.processors.cache.query.*; import org.apache.ignite.internal.processors.query.h2.*; @@ -36,6 +37,8 @@ import java.sql.*; import java.util.*; import java.util.concurrent.*; +import static org.apache.ignite.events.EventType.*; + /** * Map query executor. */ @@ -154,6 +157,22 @@ public class GridMapQueryExecutor { qr.results[i] = res; qr.resultSets[i] = rs; + if (ctx.event().isRecordable(EVT_CACHE_QUERY_EXECUTED)) { + ctx.event().record(new CacheQueryExecutedEvent<>( + node, + "SQL query executed.", + EVT_CACHE_QUERY_EXECUTED, + CacheQueryType.SQL, + null, + null, + qry.query(), + null, + null, + qry.parameters(), + null, + null)); + } + // Send the first page. sendNextPage(node, qr, i, req.pageSize(), res.getRowCount()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a368be4b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractQuerySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractQuerySelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractQuerySelfTest.java index 7283b85..6dc6fbc 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractQuerySelfTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractQuerySelfTest.java @@ -69,9 +69,6 @@ public abstract class IgniteCacheAbstractQuerySelfTest extends GridCommonAbstrac private static final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); /** */ - private static final UUID subjId = UUID.fromString("8EB3B06D-0885-4B4A-9A54-02C93EF09B65"); - - /** */ protected Ignite ignite; /** @@ -1048,57 +1045,24 @@ public abstract class IgniteCacheAbstractQuerySelfTest extends GridCommonAbstrac * @throws Exception If failed. */ public void testSqlQueryEvents() throws Exception { - testSqlQueryEvents(false); + checkSqlQueryEvents(); } /** - * @param customSubjId Use custom subject ID. * @throws Exception If failed. */ - private void testSqlQueryEvents(final boolean customSubjId) throws Exception { - final Map<Integer, Integer> map = new ConcurrentHashMap8<>(); - final CountDownLatch latch = new CountDownLatch(10); + private void checkSqlQueryEvents() throws Exception { final CountDownLatch execLatch = new CountDownLatch(cacheMode() == REPLICATED ? 1 : gridCount()); for (int i = 0; i < gridCount(); i++) { grid(i).events().localListen(new IgnitePredicate<Event>() { @Override public boolean apply(Event evt) { - assert evt instanceof CacheQueryReadEvent; - - CacheQueryReadEvent<Integer, Integer> qe = (CacheQueryReadEvent<Integer, Integer>)evt; - - assertEquals(SQL, qe.queryType()); - assertNull(qe.cacheName()); - - assertEquals(customSubjId ? subjId : grid(0).localNode().id(), qe.subjectId()); - - assertEquals("Integer", qe.className()); - assertEquals("_key >= ?", qe.clause()); - assertNull(qe.scanQueryFilter()); - assertNull(qe.continuousQueryFilter()); - assertArrayEquals(new Integer[] { 10 }, qe.arguments()); - - map.put(qe.key(), qe.value()); - - latch.countDown(); - - return true; - } - }, EVT_CACHE_QUERY_OBJECT_READ); - - grid(i).events().localListen(new IgnitePredicate<Event>() { - @Override public boolean apply(Event evt) { assert evt instanceof CacheQueryExecutedEvent; CacheQueryExecutedEvent qe = (CacheQueryExecutedEvent)evt; - assertEquals(SQL, qe.queryType()); assertNull(qe.cacheName()); - - assertEquals(customSubjId ? subjId : grid(0).localNode().id(), qe.subjectId()); - - assertEquals("Integer", qe.className()); - assertEquals("_key >= ?", qe.clause()); + assertNotNull(qe.clause()); assertNull(qe.scanQueryFilter()); assertNull(qe.continuousQueryFilter()); assertArrayEquals(new Integer[] { 10 }, qe.arguments()); @@ -1118,32 +1082,22 @@ public abstract class IgniteCacheAbstractQuerySelfTest extends GridCommonAbstrac QueryCursor<Cache.Entry<Integer, Integer>> q = cache.query(sql(Integer.class, "_key >= ?").setArgs(10)); - if (customSubjId) - ((GridCacheQueryAdapter)q).subjectId(subjId); - q.getAll(); - assert latch.await(1000, MILLISECONDS); assert execLatch.await(1000, MILLISECONDS); - - assertEquals(10, map.size()); - - for (int i = 10; i < 20; i++) - assertEquals(i, map.get(i).intValue()); } /** * @throws Exception If failed. */ public void testScanQueryEvents() throws Exception { - testScanQueryEvents(false); + checkScanQueryEvents(); } /** - * @param customSubjId Use custom subject ID. * @throws Exception If failed. */ - private void testScanQueryEvents(final boolean customSubjId) throws Exception { + private void checkScanQueryEvents() throws Exception { final Map<Integer, Integer> map = new ConcurrentHashMap8<>(); final CountDownLatch latch = new CountDownLatch(10); final CountDownLatch execLatch = new CountDownLatch(cacheMode() == REPLICATED ? 1 : gridCount()); @@ -1158,8 +1112,6 @@ public abstract class IgniteCacheAbstractQuerySelfTest extends GridCommonAbstrac assertEquals(SCAN, qe.queryType()); assertNull(qe.cacheName()); - assertEquals(customSubjId ? subjId : grid(0).localNode().id(), qe.subjectId()); - assertNull(qe.className()); assertNull(null, qe.clause()); assertNotNull(qe.scanQueryFilter()); @@ -1183,8 +1135,6 @@ public abstract class IgniteCacheAbstractQuerySelfTest extends GridCommonAbstrac assertEquals(SCAN, qe.queryType()); assertNull(qe.cacheName()); - assertEquals(customSubjId ? subjId : grid(0).localNode().id(), qe.subjectId()); - assertNull(qe.className()); assertNull(null, qe.clause()); assertNotNull(qe.scanQueryFilter()); @@ -1209,9 +1159,6 @@ public abstract class IgniteCacheAbstractQuerySelfTest extends GridCommonAbstrac } })); - if (customSubjId) - ((GridCacheQueryAdapter)q).subjectId(subjId); - q.getAll(); assert latch.await(1000, MILLISECONDS); @@ -1248,8 +1195,6 @@ public abstract class IgniteCacheAbstractQuerySelfTest extends GridCommonAbstrac assertEquals(FULL_TEXT, qe.queryType()); assertNull(qe.cacheName()); - assertEquals(customSubjId ? subjId : grid(0).localNode().id(), qe.subjectId()); - assertEquals("Person", qe.className()); assertEquals("White", qe.clause()); assertNull(qe.scanQueryFilter()); @@ -1273,8 +1218,6 @@ public abstract class IgniteCacheAbstractQuerySelfTest extends GridCommonAbstrac assertEquals(FULL_TEXT, qe.queryType()); assertNull(qe.cacheName()); - assertEquals(customSubjId ? subjId : grid(0).localNode().id(), qe.subjectId()); - assertEquals("Person", qe.className()); assertEquals("White", qe.clause()); assertNull(qe.scanQueryFilter()); @@ -1297,9 +1240,6 @@ public abstract class IgniteCacheAbstractQuerySelfTest extends GridCommonAbstrac QueryCursor<Cache.Entry<Integer, Person>> q = cache.query(new TextQuery(Person.class, "White")); - if (customSubjId) - ((GridCacheQueryAdapter)q).subjectId(subjId); - q.getAll(); assert latch.await(1000, MILLISECONDS); @@ -1315,61 +1255,27 @@ public abstract class IgniteCacheAbstractQuerySelfTest extends GridCommonAbstrac * @throws Exception If failed. */ public void testFieldsQueryEvents() throws Exception { - testFieldsQueryEvents(false); + checkFieldsQueryEvents(); } /** * @throws Exception If failed. */ - private void testFieldsQueryEvents(final boolean customSubjId) throws Exception { - final Map<Integer, String> map = new ConcurrentHashMap8<>(); - final CountDownLatch latch = new CountDownLatch(10); + private void checkFieldsQueryEvents() throws Exception { final CountDownLatch execLatch = new CountDownLatch(cacheMode() == REPLICATED ? 1 : gridCount()); for (int i = 0; i < gridCount(); i++) { grid(i).events().localListen(new IgnitePredicate<Event>() { @Override public boolean apply(Event evt) { - assert evt instanceof CacheQueryReadEvent; - - CacheQueryReadEvent qe = (CacheQueryReadEvent)evt; - - assertEquals(SQL_FIELDS, qe.queryType()); - assertNull(qe.cacheName()); - - assertEquals(customSubjId ? subjId : grid(0).localNode().id(), qe.subjectId()); - - assertNull(qe.className()); - assertEquals("select _key, name from Person where salary > ?", qe.clause()); - assertNull(qe.scanQueryFilter()); - assertNull(qe.continuousQueryFilter()); - assertArrayEquals(new Integer[] { 10 }, qe.arguments()); - - List<?> row = (List<?>)qe.row(); - - map.put((Integer)row.get(0), (String)row.get(1)); - - latch.countDown(); - - return true; - } - }, EVT_CACHE_QUERY_OBJECT_READ); - - grid(i).events().localListen(new IgnitePredicate<Event>() { - @Override public boolean apply(Event evt) { assert evt instanceof CacheQueryExecutedEvent; CacheQueryExecutedEvent qe = (CacheQueryExecutedEvent)evt; - assertEquals(SQL_FIELDS, qe.queryType()); assertNull(qe.cacheName()); - - assertEquals(customSubjId ? subjId : grid(0).localNode().id(), qe.subjectId()); - - assertNull(qe.className()); - assertEquals("select _key, name from Person where salary > ?", qe.clause()); + assertNotNull(qe.clause()); assertNull(qe.scanQueryFilter()); assertNull(qe.continuousQueryFilter()); - assertArrayEquals(new Integer[] { 10 }, qe.arguments()); + assertArrayEquals(new Integer[]{10}, qe.arguments()); execLatch.countDown(); @@ -1386,18 +1292,9 @@ public abstract class IgniteCacheAbstractQuerySelfTest extends GridCommonAbstrac QueryCursor<List<?>> q = cache .queryFields(sql("select _key, name from Person where salary > ?").setArgs(10)); - if (customSubjId) - ((GridCacheQueryAdapter)q).subjectId(subjId); - q.getAll(); - assert latch.await(1000, MILLISECONDS); assert execLatch.await(1000, MILLISECONDS); - - assertEquals(10, map.size()); - - for (int i = 11; i <= 20; i++) - assertEquals("Person " + i, map.get(i)); } /**