Repository: incubator-ignite Updated Branches: refs/heads/ignite-gg-10002 162bdac1c -> 5cabdf25e
ignite-gg-10002 - events Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/5cabdf25 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/5cabdf25 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/5cabdf25 Branch: refs/heads/ignite-gg-10002 Commit: 5cabdf25e8c8e20015020b399621c3fd9b75212e Parents: 162bdac Author: S.Vladykin <svlady...@gridgain.com> Authored: Thu Apr 2 17:10:21 2015 +0300 Committer: S.Vladykin <svlady...@gridgain.com> Committed: Thu Apr 2 17:10:21 2015 +0300 ---------------------------------------------------------------------- .../query/h2/twostep/GridMapQueryExecutor.java | 80 ++++++++++++++++++-- 1 file changed, 72 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5cabdf25/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java index a0718e5..959d1cc 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java @@ -23,6 +23,7 @@ import org.apache.ignite.events.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.managers.communication.*; import org.apache.ignite.internal.managers.eventstorage.*; +import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.query.*; import org.apache.ignite.internal.processors.query.h2.*; import org.apache.ignite.internal.processors.query.h2.twostep.messages.*; @@ -182,7 +183,9 @@ public class GridMapQueryExecutor implements GridMessageListener { throw new IgniteException(e); } - QueryResults qr = new QueryResults(req.requestId(), qrys.size()); + GridCacheContext<?,?> cctx = ctx.cache().internalCache(req.space()).context(); + + QueryResults qr = new QueryResults(req.requestId(), qrys.size(), cctx); if (nodeRess.put(req.requestId(), qr) != null) throw new IllegalStateException(); @@ -207,19 +210,19 @@ public class GridMapQueryExecutor implements GridMessageListener { "SQL query executed.", EVT_CACHE_QUERY_EXECUTED, CacheQueryType.SQL, - null, + cctx.namex(), null, qry.query(), null, null, qry.parameters(), - null, + node.id(), null)); } assert rs instanceof JdbcResultSet : rs.getClass(); - qr.addResult(i, rs); + qr.addResult(i, qry, node.id(), rs); if (qr.canceled) { qr.result(i).close(); @@ -393,14 +396,19 @@ public class GridMapQueryExecutor implements GridMessageListener { private final AtomicReferenceArray<QueryResult> results; /** */ + private final GridCacheContext<?,?> cctx; + + /** */ private volatile boolean canceled; /** * @param qryReqId Query request ID. * @param qrys Number of queries. + * @param cctx Cache context. */ - private QueryResults(long qryReqId, int qrys) { + private QueryResults(long qryReqId, int qrys, GridCacheContext<?,?> cctx) { this.qryReqId = qryReqId; + this.cctx = cctx; results = new AtomicReferenceArray<>(qrys); } @@ -415,10 +423,12 @@ public class GridMapQueryExecutor implements GridMessageListener { /** * @param qry Query result index. + * @param q Query object. + * @param qrySrcNodeId Query source node. * @param rs Result set. */ - void addResult(int qry, ResultSet rs) { - if (!results.compareAndSet(qry, null, new QueryResult(rs))) + void addResult(int qry, GridCacheSqlQuery q, UUID qrySrcNodeId, ResultSet rs) { + if (!results.compareAndSet(qry, null, new QueryResult(rs, cctx, qrySrcNodeId, q))) throw new IllegalStateException(); } @@ -462,6 +472,15 @@ public class GridMapQueryExecutor implements GridMessageListener { private final ResultSet rs; /** */ + private final GridCacheContext<?,?> cctx; + + /** */ + private final GridCacheSqlQuery qry; + + /** */ + private final UUID qrySrcNodeId; + + /** */ private int page; /** */ @@ -472,9 +491,15 @@ public class GridMapQueryExecutor implements GridMessageListener { /** * @param rs Result set. + * @param cctx Cache context. + * @param qrySrcNodeId Query source node. + * @param qry Query. */ - private QueryResult(ResultSet rs) { + private QueryResult(ResultSet rs, GridCacheContext<?,?> cctx, UUID qrySrcNodeId, GridCacheSqlQuery qry) { this.rs = rs; + this.cctx = cctx; + this.qry = qry; + this.qrySrcNodeId = qrySrcNodeId; try { res = (ResultInterface)RESULT_FIELD.get(rs); @@ -495,18 +520,57 @@ public class GridMapQueryExecutor implements GridMessageListener { if (closed) return true; + boolean readEvt = cctx.gridEvents().isRecordable(EVT_CACHE_QUERY_OBJECT_READ); + page++; for (int i = 0 ; i < pageSize; i++) { if (!res.next()) return true; + Value[] row = res.currentRow(); + + assert row != null; + + if (readEvt) { + cctx.gridEvents().record(new CacheQueryReadEvent<>( + cctx.localNode(), + "SQL fields query result set row read.", + EVT_CACHE_QUERY_OBJECT_READ, + CacheQueryType.SQL, + cctx.namex(), + null, + qry.query(), + null, + null, + qry.parameters(), + qrySrcNodeId, + null, + null, + null, + null, + row(row))); + } + rows.add(res.currentRow()); } return false; } + /** + * @param row Values array row. + * @return Objects list row. + */ + private List<?> row(Value[] row) { + List<Object> res = new ArrayList<>(row.length); + + for (Value v : row) + res.add(v.getObject()); + + return res; + } + /** {@inheritDoc} */ @Override public synchronized void close() { if (closed)