KYLIN-1719 Add config in scan request to control compress the query result or 
not


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/af5965cc
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/af5965cc
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/af5965cc

Branch: refs/heads/master
Commit: af5965ccb76342a35b1313a1f606108f6db66ef7
Parents: 3adecb0
Author: sunyerui <sunye...@gmail.com>
Authored: Sat May 21 13:48:40 2016 +0800
Committer: Yang Li <liy...@apache.org>
Committed: Sun May 22 14:14:43 2016 +0800

----------------------------------------------------------------------
 .../apache/kylin/common/KylinConfigBase.java    |   4 +
 .../hbase/cube/v2/CubeHBaseEndpointRPC.java     |   8 +-
 .../coprocessor/endpoint/CubeVisitService.java  |   6 +-
 .../endpoint/generated/CubeVisitProtos.java     | 156 +++++++++++++++++--
 .../endpoint/protobuf/CubeVisit.proto           |   1 +
 5 files changed, 156 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/af5965cc/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
----------------------------------------------------------------------
diff --git 
a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java 
b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 76961c5..5355086 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -680,4 +680,8 @@ abstract public class KylinConfigBase implements 
Serializable {
         return result;
     }
 
+    public boolean getCompressionResult() {
+        return 
Boolean.parseBoolean(getOptional("kylin.query.endpoint.compression.result", 
"true"));
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/af5965cc/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
----------------------------------------------------------------------
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
index 12b1fa8..8e37361 100644
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
@@ -320,6 +320,7 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
 
         final AtomicInteger totalScannedCount = new AtomicInteger(0);
         final ExpectedSizeIterator epResultItr = new 
ExpectedSizeIterator(shardNum);
+        final boolean compressionResult = 
cubeSeg.getCubeDesc().getConfig().getCompressionResult();
         final CubeVisitProtos.CubeVisitRequest.Builder builder = 
CubeVisitProtos.CubeVisitRequest.newBuilder();
         
builder.setGtScanRequest(scanRequestByteString).setHbaseRawScan(rawScanByteString);
         for (IntList intList : hbaseColumnsToGTIntList) {
@@ -329,6 +330,7 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
         builder.setBehavior(toggle);
         builder.setStartTime(System.currentTimeMillis());
         builder.setTimeout(epResultItr.getTimeout());
+        builder.setUseCompression(compressionResult);
 
         for (final Pair<byte[], byte[]> epRange : 
getEPKeyRanges(cuboidBaseShard, shardNum, totalShards)) {
             executorService.submit(new Runnable() {
@@ -353,7 +355,11 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
                             abnormalFinish = true;
                         } else {
                             try {
-                                
epResultItr.append(CompressionUtils.decompress(HBaseZeroCopyByteString.zeroCopyGetBytes(result.getValue().getCompressedRows())));
+                                if (compressionResult) {
+                                    
epResultItr.append(CompressionUtils.decompress(HBaseZeroCopyByteString.zeroCopyGetBytes(result.getValue().getCompressedRows())));
+                                } else {
+                                    
epResultItr.append(HBaseZeroCopyByteString.zeroCopyGetBytes(result.getValue().getCompressedRows()));
+                                }
                             } catch (IOException | DataFormatException e) {
                                 throw new RuntimeException(logHeader + "Error 
when decompressing", e);
                             }

http://git-wip-us.apache.org/repos/asf/kylin/blob/af5965cc/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
----------------------------------------------------------------------
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
index 0cd35f1..510e735 100644
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
@@ -315,7 +315,11 @@ public class CubeVisitService extends 
CubeVisitProtos.CubeVisitService implement
             } else {
                 allRows = new byte[0];
             }
-            compressedAllRows = CompressionUtils.compress(allRows);
+            if (request.hasUseCompression() && request.getUseCompression() == 
false) {
+                compressedAllRows = allRows;
+            } else {
+                compressedAllRows = CompressionUtils.compress(allRows);
+            }
 
             appendProfileInfo(sb, "compress done");
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/af5965cc/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/generated/CubeVisitProtos.java
----------------------------------------------------------------------
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/generated/CubeVisitProtos.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/generated/CubeVisitProtos.java
index 53393e8..1a9bf05 100644
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/generated/CubeVisitProtos.java
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/generated/CubeVisitProtos.java
@@ -116,6 +116,24 @@ public final class CubeVisitProtos {
      * </pre>
      */
     long getTimeout();
+
+    // optional bool useCompression = 8;
+    /**
+     * <code>optional bool useCompression = 8;</code>
+     *
+     * <pre>
+     * compress result or not
+     * </pre>
+     */
+    boolean hasUseCompression();
+    /**
+     * <code>optional bool useCompression = 8;</code>
+     *
+     * <pre>
+     * compress result or not
+     * </pre>
+     */
+    boolean getUseCompression();
   }
   /**
    * Protobuf type {@code CubeVisitRequest}
@@ -206,6 +224,11 @@ public final class CubeVisitProtos {
               timeout_ = input.readInt64();
               break;
             }
+            case 64: {
+              bitField0_ |= 0x00000040;
+              useCompression_ = input.readBool();
+              break;
+            }
           }
         }
       } catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -928,6 +951,30 @@ public final class CubeVisitProtos {
       return timeout_;
     }
 
+    // optional bool useCompression = 8;
+    public static final int USECOMPRESSION_FIELD_NUMBER = 8;
+    private boolean useCompression_;
+    /**
+     * <code>optional bool useCompression = 8;</code>
+     *
+     * <pre>
+     * compress result or not
+     * </pre>
+     */
+    public boolean hasUseCompression() {
+      return ((bitField0_ & 0x00000040) == 0x00000040);
+    }
+    /**
+     * <code>optional bool useCompression = 8;</code>
+     *
+     * <pre>
+     * compress result or not
+     * </pre>
+     */
+    public boolean getUseCompression() {
+      return useCompression_;
+    }
+
     private void initFields() {
       behavior_ = "";
       gtScanRequest_ = com.google.protobuf.ByteString.EMPTY;
@@ -936,6 +983,7 @@ public final class CubeVisitProtos {
       hbaseColumnsToGT_ = java.util.Collections.emptyList();
       startTime_ = 0L;
       timeout_ = 0L;
+      useCompression_ = false;
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -994,6 +1042,9 @@ public final class CubeVisitProtos {
       if (((bitField0_ & 0x00000020) == 0x00000020)) {
         output.writeInt64(7, timeout_);
       }
+      if (((bitField0_ & 0x00000040) == 0x00000040)) {
+        output.writeBool(8, useCompression_);
+      }
       getUnknownFields().writeTo(output);
     }
 
@@ -1031,6 +1082,10 @@ public final class CubeVisitProtos {
         size += com.google.protobuf.CodedOutputStream
           .computeInt64Size(7, timeout_);
       }
+      if (((bitField0_ & 0x00000040) == 0x00000040)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeBoolSize(8, useCompression_);
+      }
       size += getUnknownFields().getSerializedSize();
       memoizedSerializedSize = size;
       return size;
@@ -1086,6 +1141,11 @@ public final class CubeVisitProtos {
         result = result && (getTimeout()
             == other.getTimeout());
       }
+      result = result && (hasUseCompression() == other.hasUseCompression());
+      if (hasUseCompression()) {
+        result = result && (getUseCompression()
+            == other.getUseCompression());
+      }
       result = result &&
           getUnknownFields().equals(other.getUnknownFields());
       return result;
@@ -1127,6 +1187,10 @@ public final class CubeVisitProtos {
         hash = (37 * hash) + TIMEOUT_FIELD_NUMBER;
         hash = (53 * hash) + hashLong(getTimeout());
       }
+      if (hasUseCompression()) {
+        hash = (37 * hash) + USECOMPRESSION_FIELD_NUMBER;
+        hash = (53 * hash) + hashBoolean(getUseCompression());
+      }
       hash = (29 * hash) + getUnknownFields().hashCode();
       memoizedHashCode = hash;
       return hash;
@@ -1255,6 +1319,8 @@ public final class CubeVisitProtos {
         bitField0_ = (bitField0_ & ~0x00000020);
         timeout_ = 0L;
         bitField0_ = (bitField0_ & ~0x00000040);
+        useCompression_ = false;
+        bitField0_ = (bitField0_ & ~0x00000080);
         return this;
       }
 
@@ -1316,6 +1382,10 @@ public final class CubeVisitProtos {
           to_bitField0_ |= 0x00000020;
         }
         result.timeout_ = timeout_;
+        if (((from_bitField0_ & 0x00000080) == 0x00000080)) {
+          to_bitField0_ |= 0x00000040;
+        }
+        result.useCompression_ = useCompression_;
         result.bitField0_ = to_bitField0_;
         onBuilt();
         return result;
@@ -1378,6 +1448,9 @@ public final class CubeVisitProtos {
         if (other.hasTimeout()) {
           setTimeout(other.getTimeout());
         }
+        if (other.hasUseCompression()) {
+          setUseCompression(other.getUseCompression());
+        }
         this.mergeUnknownFields(other.getUnknownFields());
         return this;
       }
@@ -1946,6 +2019,55 @@ public final class CubeVisitProtos {
         return this;
       }
 
+      // optional bool useCompression = 8;
+      private boolean useCompression_ ;
+      /**
+       * <code>optional bool useCompression = 8;</code>
+       *
+       * <pre>
+       * compress result or not
+       * </pre>
+       */
+      public boolean hasUseCompression() {
+        return ((bitField0_ & 0x00000080) == 0x00000080);
+      }
+      /**
+       * <code>optional bool useCompression = 8;</code>
+       *
+       * <pre>
+       * compress result or not
+       * </pre>
+       */
+      public boolean getUseCompression() {
+        return useCompression_;
+      }
+      /**
+       * <code>optional bool useCompression = 8;</code>
+       *
+       * <pre>
+       * compress result or not
+       * </pre>
+       */
+      public Builder setUseCompression(boolean value) {
+        bitField0_ |= 0x00000080;
+        useCompression_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional bool useCompression = 8;</code>
+       *
+       * <pre>
+       * compress result or not
+       * </pre>
+       */
+      public Builder clearUseCompression() {
+        bitField0_ = (bitField0_ & ~0x00000080);
+        useCompression_ = false;
+        onChanged();
+        return this;
+      }
+
       // @@protoc_insertion_point(builder_scope:CubeVisitRequest)
     }
 
@@ -4300,26 +4422,26 @@ public final class CubeVisitProtos {
     java.lang.String[] descriptorData = {
       "\npstorage-hbase/src/main/java/org/apache" +
       "/kylin/storage/hbase/cube/v2/coprocessor" +
-      "/endpoint/protobuf/CubeVisit.proto\"\337\001\n\020C" +
+      "/endpoint/protobuf/CubeVisit.proto\"\367\001\n\020C" +
       "ubeVisitRequest\022\020\n\010behavior\030\001 \002(\t\022\025\n\rgtS" +
       "canRequest\030\002 \002(\014\022\024\n\014hbaseRawScan\030\003 
\002(\014\022\032" +
       "\n\022rowkeyPreambleSize\030\004 \002(\005\0223\n\020hbaseColum" +
       "nsToGT\030\005 \003(\0132\031.CubeVisitRequest.IntList\022" +
-      "\021\n\tstartTime\030\006 \002(\003\022\017\n\007timeout\030\007 
\002(\003\032\027\n\007I" +
-      "ntList\022\014\n\004ints\030\001 
\003(\005\"\321\002\n\021CubeVisitRespon" +
-      "se\022\026\n\016compressedRows\030\001 
\002(\014\022\'\n\005stats\030\002 \002(",
-      "\0132\030.CubeVisitResponse.Stats\032\372\001\n\005Stats\022\030\n" +
-      "\020serviceStartTime\030\001 \001(\003\022\026\n\016serviceEndTim" +
-      "e\030\002 \001(\003\022\027\n\017scannedRowCount\030\003 
\001(\005\022\032\n\022aggr" +
-      "egatedRowCount\030\004 \001(\005\022\025\n\rsystemCpuLoad\030\005 " +
-      "\001(\001\022\036\n\026freePhysicalMemorySize\030\006 
\001(\001\022\031\n\021f" +
-      "reeSwapSpaceSize\030\007 \001(\001\022\020\n\010hostname\030\010 
\001(\t" +
-      "\022\016\n\006etcMsg\030\t \001(\t\022\026\n\016normalComplete\030\n 
\001(\005" +
-      "2F\n\020CubeVisitService\0222\n\tvisitCube\022\021.Cube" +
-      "VisitRequest\032\022.CubeVisitResponseB`\nEorg." +
-      "apache.kylin.storage.hbase.cube.v2.copro",
-      "cessor.endpoint.generatedB\017CubeVisitProt" +
-      "osH\001\210\001\001\240\001\001"
+      "\021\n\tstartTime\030\006 \002(\003\022\017\n\007timeout\030\007 
\002(\003\022\026\n\016u" +
+      "seCompression\030\010 
\001(\010\032\027\n\007IntList\022\014\n\004ints\030\001" +
+      " \003(\005\"\321\002\n\021CubeVisitResponse\022\026\n\016compressed",
+      "Rows\030\001 \002(\014\022\'\n\005stats\030\002 
\002(\0132\030.CubeVisitRes" +
+      "ponse.Stats\032\372\001\n\005Stats\022\030\n\020serviceStartTim" +
+      "e\030\001 \001(\003\022\026\n\016serviceEndTime\030\002 
\001(\003\022\027\n\017scann" +
+      "edRowCount\030\003 \001(\005\022\032\n\022aggregatedRowCount\030\004" +
+      " \001(\005\022\025\n\rsystemCpuLoad\030\005 
\001(\001\022\036\n\026freePhysi" +
+      "calMemorySize\030\006 \001(\001\022\031\n\021freeSwapSpaceSize" +
+      "\030\007 \001(\001\022\020\n\010hostname\030\010 
\001(\t\022\016\n\006etcMsg\030\t \001(\t" +
+      "\022\026\n\016normalComplete\030\n \001(\0052F\n\020CubeVisitSer" +
+      "vice\0222\n\tvisitCube\022\021.CubeVisitRequest\032\022.C" +
+      "ubeVisitResponseB`\nEorg.apache.kylin.sto",
+      "rage.hbase.cube.v2.coprocessor.endpoint." +
+      "generatedB\017CubeVisitProtosH\001\210\001\001\240\001\001"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner 
assigner =
       new 
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -4331,7 +4453,7 @@ public final class CubeVisitProtos {
           internal_static_CubeVisitRequest_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_CubeVisitRequest_descriptor,
-              new java.lang.String[] { "Behavior", "GtScanRequest", 
"HbaseRawScan", "RowkeyPreambleSize", "HbaseColumnsToGT", "StartTime", 
"Timeout", });
+              new java.lang.String[] { "Behavior", "GtScanRequest", 
"HbaseRawScan", "RowkeyPreambleSize", "HbaseColumnsToGT", "StartTime", 
"Timeout", "UseCompression", });
           internal_static_CubeVisitRequest_IntList_descriptor =
             
internal_static_CubeVisitRequest_descriptor.getNestedTypes().get(0);
           internal_static_CubeVisitRequest_IntList_fieldAccessorTable = new

http://git-wip-us.apache.org/repos/asf/kylin/blob/af5965cc/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/protobuf/CubeVisit.proto
----------------------------------------------------------------------
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/protobuf/CubeVisit.proto
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/protobuf/CubeVisit.proto
index ecaad35..f16db7b 100644
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/protobuf/CubeVisit.proto
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/protobuf/CubeVisit.proto
@@ -37,6 +37,7 @@ message CubeVisitRequest {
     repeated IntList hbaseColumnsToGT = 5;
     required int64 startTime = 6;//when client start the request
     required int64 timeout = 7;//how long client will wait
+    optional bool useCompression = 8; // compress result or not
     message IntList {
         repeated int32 ints = 1;
     }

Reply via email to