Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-gg-10002 162bdac1c -> 5cabdf25e


ignite-gg-10002 - events


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/5cabdf25
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/5cabdf25
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/5cabdf25

Branch: refs/heads/ignite-gg-10002
Commit: 5cabdf25e8c8e20015020b399621c3fd9b75212e
Parents: 162bdac
Author: S.Vladykin <svlady...@gridgain.com>
Authored: Thu Apr 2 17:10:21 2015 +0300
Committer: S.Vladykin <svlady...@gridgain.com>
Committed: Thu Apr 2 17:10:21 2015 +0300

----------------------------------------------------------------------
 .../query/h2/twostep/GridMapQueryExecutor.java  | 80 ++++++++++++++++++--
 1 file changed, 72 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5cabdf25/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
index a0718e5..959d1cc 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
@@ -23,6 +23,7 @@ import org.apache.ignite.events.*;
 import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.managers.communication.*;
 import org.apache.ignite.internal.managers.eventstorage.*;
+import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.processors.cache.query.*;
 import org.apache.ignite.internal.processors.query.h2.*;
 import org.apache.ignite.internal.processors.query.h2.twostep.messages.*;
@@ -182,7 +183,9 @@ public class GridMapQueryExecutor implements 
GridMessageListener {
             throw new IgniteException(e);
         }
 
-        QueryResults qr = new QueryResults(req.requestId(), qrys.size());
+        GridCacheContext<?,?> cctx = 
ctx.cache().internalCache(req.space()).context();
+
+        QueryResults qr = new QueryResults(req.requestId(), qrys.size(), cctx);
 
         if (nodeRess.put(req.requestId(), qr) != null)
             throw new IllegalStateException();
@@ -207,19 +210,19 @@ public class GridMapQueryExecutor implements 
GridMessageListener {
                         "SQL query executed.",
                         EVT_CACHE_QUERY_EXECUTED,
                         CacheQueryType.SQL,
-                        null,
+                        cctx.namex(),
                         null,
                         qry.query(),
                         null,
                         null,
                         qry.parameters(),
-                        null,
+                        node.id(),
                         null));
                 }
 
                 assert rs instanceof JdbcResultSet : rs.getClass();
 
-                qr.addResult(i, rs);
+                qr.addResult(i, qry, node.id(), rs);
 
                 if (qr.canceled) {
                     qr.result(i).close();
@@ -393,14 +396,19 @@ public class GridMapQueryExecutor implements 
GridMessageListener {
         private final AtomicReferenceArray<QueryResult> results;
 
         /** */
+        private final GridCacheContext<?,?> cctx;
+
+        /** */
         private volatile boolean canceled;
 
         /**
          * @param qryReqId Query request ID.
          * @param qrys Number of queries.
+         * @param cctx Cache context.
          */
-        private QueryResults(long qryReqId, int qrys) {
+        private QueryResults(long qryReqId, int qrys, GridCacheContext<?,?> 
cctx) {
             this.qryReqId = qryReqId;
+            this.cctx = cctx;
 
             results = new AtomicReferenceArray<>(qrys);
         }
@@ -415,10 +423,12 @@ public class GridMapQueryExecutor implements 
GridMessageListener {
 
         /**
          * @param qry Query result index.
+         * @param q Query object.
+         * @param qrySrcNodeId Query source node.
          * @param rs Result set.
          */
-        void addResult(int qry, ResultSet rs) {
-            if (!results.compareAndSet(qry, null, new QueryResult(rs)))
+        void addResult(int qry, GridCacheSqlQuery q, UUID qrySrcNodeId, 
ResultSet rs) {
+            if (!results.compareAndSet(qry, null, new QueryResult(rs, cctx, 
qrySrcNodeId, q)))
                 throw new IllegalStateException();
         }
 
@@ -462,6 +472,15 @@ public class GridMapQueryExecutor implements 
GridMessageListener {
         private final ResultSet rs;
 
         /** */
+        private final GridCacheContext<?,?> cctx;
+
+        /** */
+        private final GridCacheSqlQuery qry;
+
+        /** */
+        private final UUID qrySrcNodeId;
+
+        /** */
         private int page;
 
         /** */
@@ -472,9 +491,15 @@ public class GridMapQueryExecutor implements 
GridMessageListener {
 
         /**
          * @param rs Result set.
+         * @param cctx Cache context.
+         * @param qrySrcNodeId Query source node.
+         * @param qry Query.
          */
-        private QueryResult(ResultSet rs) {
+        private QueryResult(ResultSet rs, GridCacheContext<?,?> cctx, UUID 
qrySrcNodeId, GridCacheSqlQuery qry) {
             this.rs = rs;
+            this.cctx = cctx;
+            this.qry = qry;
+            this.qrySrcNodeId = qrySrcNodeId;
 
             try {
                 res = (ResultInterface)RESULT_FIELD.get(rs);
@@ -495,18 +520,57 @@ public class GridMapQueryExecutor implements 
GridMessageListener {
             if (closed)
                 return true;
 
+            boolean readEvt = 
cctx.gridEvents().isRecordable(EVT_CACHE_QUERY_OBJECT_READ);
+
             page++;
 
             for (int i = 0 ; i < pageSize; i++) {
                 if (!res.next())
                     return true;
 
+                Value[] row = res.currentRow();
+
+                assert row != null;
+
+                if (readEvt) {
+                    cctx.gridEvents().record(new CacheQueryReadEvent<>(
+                        cctx.localNode(),
+                        "SQL fields query result set row read.",
+                        EVT_CACHE_QUERY_OBJECT_READ,
+                        CacheQueryType.SQL,
+                        cctx.namex(),
+                        null,
+                        qry.query(),
+                        null,
+                        null,
+                        qry.parameters(),
+                        qrySrcNodeId,
+                        null,
+                        null,
+                        null,
+                        null,
+                        row(row)));
+                }
+
                 rows.add(res.currentRow());
             }
 
             return false;
         }
 
+        /**
+         * @param row Values array row.
+         * @return Objects list row.
+         */
+        private List<?> row(Value[] row) {
+            List<Object> res = new ArrayList<>(row.length);
+
+            for (Value v : row)
+                res.add(v.getObject());
+
+            return res;
+        }
+
         /** {@inheritDoc} */
         @Override public synchronized void close() {
             if (closed)

Reply via email to