KYLIN-1719 Add config in scan request to control compress the query result or not
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/af5965cc Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/af5965cc Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/af5965cc Branch: refs/heads/master Commit: af5965ccb76342a35b1313a1f606108f6db66ef7 Parents: 3adecb0 Author: sunyerui <sunye...@gmail.com> Authored: Sat May 21 13:48:40 2016 +0800 Committer: Yang Li <liy...@apache.org> Committed: Sun May 22 14:14:43 2016 +0800 ---------------------------------------------------------------------- .../apache/kylin/common/KylinConfigBase.java | 4 + .../hbase/cube/v2/CubeHBaseEndpointRPC.java | 8 +- .../coprocessor/endpoint/CubeVisitService.java | 6 +- .../endpoint/generated/CubeVisitProtos.java | 156 +++++++++++++++++-- .../endpoint/protobuf/CubeVisit.proto | 1 + 5 files changed, 156 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/af5965cc/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index 76961c5..5355086 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -680,4 +680,8 @@ abstract public class KylinConfigBase implements Serializable { return result; } + public boolean getCompressionResult() { + return Boolean.parseBoolean(getOptional("kylin.query.endpoint.compression.result", "true")); + } + } http://git-wip-us.apache.org/repos/asf/kylin/blob/af5965cc/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java index 12b1fa8..8e37361 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java @@ -320,6 +320,7 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC { final AtomicInteger totalScannedCount = new AtomicInteger(0); final ExpectedSizeIterator epResultItr = new ExpectedSizeIterator(shardNum); + final boolean compressionResult = cubeSeg.getCubeDesc().getConfig().getCompressionResult(); final CubeVisitProtos.CubeVisitRequest.Builder builder = CubeVisitProtos.CubeVisitRequest.newBuilder(); builder.setGtScanRequest(scanRequestByteString).setHbaseRawScan(rawScanByteString); for (IntList intList : hbaseColumnsToGTIntList) { @@ -329,6 +330,7 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC { builder.setBehavior(toggle); builder.setStartTime(System.currentTimeMillis()); builder.setTimeout(epResultItr.getTimeout()); + builder.setUseCompression(compressionResult); for (final Pair<byte[], byte[]> epRange : getEPKeyRanges(cuboidBaseShard, shardNum, totalShards)) { executorService.submit(new Runnable() { @@ -353,7 +355,11 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC { abnormalFinish = true; } else { try { - epResultItr.append(CompressionUtils.decompress(HBaseZeroCopyByteString.zeroCopyGetBytes(result.getValue().getCompressedRows()))); + if (compressionResult) { + epResultItr.append(CompressionUtils.decompress(HBaseZeroCopyByteString.zeroCopyGetBytes(result.getValue().getCompressedRows()))); + } else { + epResultItr.append(HBaseZeroCopyByteString.zeroCopyGetBytes(result.getValue().getCompressedRows())); + } } catch (IOException | DataFormatException e) { throw new RuntimeException(logHeader + "Error when decompressing", e); } http://git-wip-us.apache.org/repos/asf/kylin/blob/af5965cc/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java index 0cd35f1..510e735 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java @@ -315,7 +315,11 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement } else { allRows = new byte[0]; } - compressedAllRows = CompressionUtils.compress(allRows); + if (request.hasUseCompression() && request.getUseCompression() == false) { + compressedAllRows = allRows; + } else { + compressedAllRows = CompressionUtils.compress(allRows); + } appendProfileInfo(sb, "compress done"); http://git-wip-us.apache.org/repos/asf/kylin/blob/af5965cc/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/generated/CubeVisitProtos.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/generated/CubeVisitProtos.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/generated/CubeVisitProtos.java index 53393e8..1a9bf05 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/generated/CubeVisitProtos.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/generated/CubeVisitProtos.java @@ -116,6 +116,24 @@ public final class CubeVisitProtos { * </pre> */ long getTimeout(); + + // optional bool useCompression = 8; + /** + * <code>optional bool useCompression = 8;</code> + * + * <pre> + * compress result or not + * </pre> + */ + boolean hasUseCompression(); + /** + * <code>optional bool useCompression = 8;</code> + * + * <pre> + * compress result or not + * </pre> + */ + boolean getUseCompression(); } /** * Protobuf type {@code CubeVisitRequest} @@ -206,6 +224,11 @@ public final class CubeVisitProtos { timeout_ = input.readInt64(); break; } + case 64: { + bitField0_ |= 0x00000040; + useCompression_ = input.readBool(); + break; + } } } } catch (com.google.protobuf.InvalidProtocolBufferException e) { @@ -928,6 +951,30 @@ public final class CubeVisitProtos { return timeout_; } + // optional bool useCompression = 8; + public static final int USECOMPRESSION_FIELD_NUMBER = 8; + private boolean useCompression_; + /** + * <code>optional bool useCompression = 8;</code> + * + * <pre> + * compress result or not + * </pre> + */ + public boolean hasUseCompression() { + return ((bitField0_ & 0x00000040) == 0x00000040); + } + /** + * <code>optional bool useCompression = 8;</code> + * + * <pre> + * compress result or not + * </pre> + */ + public boolean getUseCompression() { + return useCompression_; + } + private void initFields() { behavior_ = ""; gtScanRequest_ = com.google.protobuf.ByteString.EMPTY; @@ -936,6 +983,7 @@ public final class CubeVisitProtos { hbaseColumnsToGT_ = java.util.Collections.emptyList(); startTime_ = 0L; timeout_ = 0L; + useCompression_ = false; } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -994,6 +1042,9 @@ public final class CubeVisitProtos { if (((bitField0_ & 0x00000020) == 0x00000020)) { output.writeInt64(7, timeout_); } + if (((bitField0_ & 0x00000040) == 0x00000040)) { + output.writeBool(8, useCompression_); + } getUnknownFields().writeTo(output); } @@ -1031,6 +1082,10 @@ public final class CubeVisitProtos { size += com.google.protobuf.CodedOutputStream .computeInt64Size(7, timeout_); } + if (((bitField0_ & 0x00000040) == 0x00000040)) { + size += com.google.protobuf.CodedOutputStream + .computeBoolSize(8, useCompression_); + } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; @@ -1086,6 +1141,11 @@ public final class CubeVisitProtos { result = result && (getTimeout() == other.getTimeout()); } + result = result && (hasUseCompression() == other.hasUseCompression()); + if (hasUseCompression()) { + result = result && (getUseCompression() + == other.getUseCompression()); + } result = result && getUnknownFields().equals(other.getUnknownFields()); return result; @@ -1127,6 +1187,10 @@ public final class CubeVisitProtos { hash = (37 * hash) + TIMEOUT_FIELD_NUMBER; hash = (53 * hash) + hashLong(getTimeout()); } + if (hasUseCompression()) { + hash = (37 * hash) + USECOMPRESSION_FIELD_NUMBER; + hash = (53 * hash) + hashBoolean(getUseCompression()); + } hash = (29 * hash) + getUnknownFields().hashCode(); memoizedHashCode = hash; return hash; @@ -1255,6 +1319,8 @@ public final class CubeVisitProtos { bitField0_ = (bitField0_ & ~0x00000020); timeout_ = 0L; bitField0_ = (bitField0_ & ~0x00000040); + useCompression_ = false; + bitField0_ = (bitField0_ & ~0x00000080); return this; } @@ -1316,6 +1382,10 @@ public final class CubeVisitProtos { to_bitField0_ |= 0x00000020; } result.timeout_ = timeout_; + if (((from_bitField0_ & 0x00000080) == 0x00000080)) { + to_bitField0_ |= 0x00000040; + } + result.useCompression_ = useCompression_; result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -1378,6 +1448,9 @@ public final class CubeVisitProtos { if (other.hasTimeout()) { setTimeout(other.getTimeout()); } + if (other.hasUseCompression()) { + setUseCompression(other.getUseCompression()); + } this.mergeUnknownFields(other.getUnknownFields()); return this; } @@ -1946,6 +2019,55 @@ public final class CubeVisitProtos { return this; } + // optional bool useCompression = 8; + private boolean useCompression_ ; + /** + * <code>optional bool useCompression = 8;</code> + * + * <pre> + * compress result or not + * </pre> + */ + public boolean hasUseCompression() { + return ((bitField0_ & 0x00000080) == 0x00000080); + } + /** + * <code>optional bool useCompression = 8;</code> + * + * <pre> + * compress result or not + * </pre> + */ + public boolean getUseCompression() { + return useCompression_; + } + /** + * <code>optional bool useCompression = 8;</code> + * + * <pre> + * compress result or not + * </pre> + */ + public Builder setUseCompression(boolean value) { + bitField0_ |= 0x00000080; + useCompression_ = value; + onChanged(); + return this; + } + /** + * <code>optional bool useCompression = 8;</code> + * + * <pre> + * compress result or not + * </pre> + */ + public Builder clearUseCompression() { + bitField0_ = (bitField0_ & ~0x00000080); + useCompression_ = false; + onChanged(); + return this; + } + // @@protoc_insertion_point(builder_scope:CubeVisitRequest) } @@ -4300,26 +4422,26 @@ public final class CubeVisitProtos { java.lang.String[] descriptorData = { "\npstorage-hbase/src/main/java/org/apache" + "/kylin/storage/hbase/cube/v2/coprocessor" + - "/endpoint/protobuf/CubeVisit.proto\"\337\001\n\020C" + + "/endpoint/protobuf/CubeVisit.proto\"\367\001\n\020C" + "ubeVisitRequest\022\020\n\010behavior\030\001 \002(\t\022\025\n\rgtS" + "canRequest\030\002 \002(\014\022\024\n\014hbaseRawScan\030\003 \002(\014\022\032" + "\n\022rowkeyPreambleSize\030\004 \002(\005\0223\n\020hbaseColum" + "nsToGT\030\005 \003(\0132\031.CubeVisitRequest.IntList\022" + - "\021\n\tstartTime\030\006 \002(\003\022\017\n\007timeout\030\007 \002(\003\032\027\n\007I" + - "ntList\022\014\n\004ints\030\001 \003(\005\"\321\002\n\021CubeVisitRespon" + - "se\022\026\n\016compressedRows\030\001 \002(\014\022\'\n\005stats\030\002 \002(", - "\0132\030.CubeVisitResponse.Stats\032\372\001\n\005Stats\022\030\n" + - "\020serviceStartTime\030\001 \001(\003\022\026\n\016serviceEndTim" + - "e\030\002 \001(\003\022\027\n\017scannedRowCount\030\003 \001(\005\022\032\n\022aggr" + - "egatedRowCount\030\004 \001(\005\022\025\n\rsystemCpuLoad\030\005 " + - "\001(\001\022\036\n\026freePhysicalMemorySize\030\006 \001(\001\022\031\n\021f" + - "reeSwapSpaceSize\030\007 \001(\001\022\020\n\010hostname\030\010 \001(\t" + - "\022\016\n\006etcMsg\030\t \001(\t\022\026\n\016normalComplete\030\n \001(\005" + - "2F\n\020CubeVisitService\0222\n\tvisitCube\022\021.Cube" + - "VisitRequest\032\022.CubeVisitResponseB`\nEorg." + - "apache.kylin.storage.hbase.cube.v2.copro", - "cessor.endpoint.generatedB\017CubeVisitProt" + - "osH\001\210\001\001\240\001\001" + "\021\n\tstartTime\030\006 \002(\003\022\017\n\007timeout\030\007 \002(\003\022\026\n\016u" + + "seCompression\030\010 \001(\010\032\027\n\007IntList\022\014\n\004ints\030\001" + + " \003(\005\"\321\002\n\021CubeVisitResponse\022\026\n\016compressed", + "Rows\030\001 \002(\014\022\'\n\005stats\030\002 \002(\0132\030.CubeVisitRes" + + "ponse.Stats\032\372\001\n\005Stats\022\030\n\020serviceStartTim" + + "e\030\001 \001(\003\022\026\n\016serviceEndTime\030\002 \001(\003\022\027\n\017scann" + + "edRowCount\030\003 \001(\005\022\032\n\022aggregatedRowCount\030\004" + + " \001(\005\022\025\n\rsystemCpuLoad\030\005 \001(\001\022\036\n\026freePhysi" + + "calMemorySize\030\006 \001(\001\022\031\n\021freeSwapSpaceSize" + + "\030\007 \001(\001\022\020\n\010hostname\030\010 \001(\t\022\016\n\006etcMsg\030\t \001(\t" + + "\022\026\n\016normalComplete\030\n \001(\0052F\n\020CubeVisitSer" + + "vice\0222\n\tvisitCube\022\021.CubeVisitRequest\032\022.C" + + "ubeVisitResponseB`\nEorg.apache.kylin.sto", + "rage.hbase.cube.v2.coprocessor.endpoint." + + "generatedB\017CubeVisitProtosH\001\210\001\001\240\001\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -4331,7 +4453,7 @@ public final class CubeVisitProtos { internal_static_CubeVisitRequest_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_CubeVisitRequest_descriptor, - new java.lang.String[] { "Behavior", "GtScanRequest", "HbaseRawScan", "RowkeyPreambleSize", "HbaseColumnsToGT", "StartTime", "Timeout", }); + new java.lang.String[] { "Behavior", "GtScanRequest", "HbaseRawScan", "RowkeyPreambleSize", "HbaseColumnsToGT", "StartTime", "Timeout", "UseCompression", }); internal_static_CubeVisitRequest_IntList_descriptor = internal_static_CubeVisitRequest_descriptor.getNestedTypes().get(0); internal_static_CubeVisitRequest_IntList_fieldAccessorTable = new http://git-wip-us.apache.org/repos/asf/kylin/blob/af5965cc/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/protobuf/CubeVisit.proto ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/protobuf/CubeVisit.proto b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/protobuf/CubeVisit.proto index ecaad35..f16db7b 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/protobuf/CubeVisit.proto +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/protobuf/CubeVisit.proto @@ -37,6 +37,7 @@ message CubeVisitRequest { repeated IntList hbaseColumnsToGT = 5; required int64 startTime = 6;//when client start the request required int64 timeout = 7;//how long client will wait + optional bool useCompression = 8; // compress result or not message IntList { repeated int32 ints = 1; }