spark cubing init commit
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/78e6cd5b Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/78e6cd5b Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/78e6cd5b Branch: refs/heads/sparkcubing-rebase Commit: 78e6cd5b771d96744e379e66913277e54632552b Parents: 73fbbb9 Author: shaofengshi <shaofeng...@apache.org> Authored: Sun Dec 25 15:59:16 2016 +0800 Committer: shaofengshi <shaofeng...@apache.org> Committed: Sun Dec 25 19:17:32 2016 +0800 ---------------------------------------------------------------------- assembly/pom.xml | 5 +- .../main/config/assemblies/source-assembly.xml | 3 - .../java/org/apache/kylin/job/DeployUtil.java | 1 - build/conf/kylin-spark-conf.properties | 27 ++ .../apache/kylin/common/KylinConfigBase.java | 38 +- .../kylin/common/persistence/ResourceStore.java | 35 ++ .../org/apache/kylin/common/util/Array.java | 2 +- .../apache/kylin/common/util/SplittedBytes.java | 2 +- .../java/org/apache/kylin/cube/CubeSegment.java | 2 +- .../kylin/cube/common/RowKeySplitter.java | 12 +- .../org/apache/kylin/cube/cuboid/Cuboid.java | 27 +- .../kylin/cube/cuboid/CuboidScheduler.java | 15 +- .../kylin/cube/kv/AbstractRowKeyEncoder.java | 6 +- .../org/apache/kylin/cube/kv/CubeDimEncMap.java | 7 +- .../apache/kylin/cube/kv/RowKeyColumnIO.java | 2 +- .../org/apache/kylin/cube/kv/RowKeyEncoder.java | 19 +- .../kylin/cube/model/AggregationGroup.java | 4 +- .../org/apache/kylin/cube/model/CubeDesc.java | 2 +- .../cube/model/CubeJoinedFlatTableDesc.java | 2 +- .../cube/model/CubeJoinedFlatTableEnrich.java | 6 +- .../apache/kylin/cube/model/DictionaryDesc.java | 2 +- .../apache/kylin/cube/model/DimensionDesc.java | 2 +- .../kylin/cube/model/HBaseColumnDesc.java | 11 +- .../kylin/cube/model/HBaseColumnFamilyDesc.java | 9 +- .../kylin/cube/model/HBaseMappingDesc.java | 15 +- .../apache/kylin/cube/model/HierarchyDesc.java | 2 +- .../apache/kylin/cube/model/RowKeyColDesc.java | 2 +- .../org/apache/kylin/cube/model/RowKeyDesc.java | 17 +- .../org/apache/kylin/cube/model/SelectRule.java | 2 +- .../dict/NumberDictionaryForestBuilder.java | 2 +- .../apache/kylin/dict/StringBytesConverter.java | 2 +- .../kylin/job/execution/ExecutableManager.java | 21 +- .../kylin/dimension/AbstractDateDimEnc.java | 12 +- .../apache/kylin/dimension/BooleanDimEnc.java | 4 +- .../kylin/dimension/DictionaryDimEnc.java | 21 +- .../apache/kylin/dimension/FixedLenDimEnc.java | 4 +- .../kylin/dimension/FixedLenHexDimEnc.java | 4 +- .../org/apache/kylin/dimension/IntDimEnc.java | 4 +- .../apache/kylin/dimension/IntegerDimEnc.java | 4 +- .../kylin/dimension/OneMoreByteVLongDimEnc.java | 14 +- .../kylin/measure/BufferedMeasureCodec.java | 10 +- .../org/apache/kylin/measure/MeasureCodec.java | 8 +- .../apache/kylin/measure/MeasureIngester.java | 12 +- .../org/apache/kylin/measure/MeasureType.java | 12 +- .../kylin/measure/basic/DoubleIngester.java | 5 + .../measure/basic/DoubleSumAggregator.java | 3 +- .../kylin/measure/basic/LongIngester.java | 5 + .../kylin/measure/basic/LongSumAggregator.java | 3 +- .../kylin/measure/bitmap/BitmapCounter.java | 8 +- .../kylin/measure/bitmap/BitmapMeasureType.java | 5 + .../kylin/measure/hllc/HLLCMeasureType.java | 5 + .../kylin/measure/hllc/HLLCSerializer.java | 11 +- .../apache/kylin/measure/raw/RawSerializer.java | 4 +- .../apache/kylin/measure/topn/TopNCounter.java | 2 +- .../kylin/measure/topn/TopNMeasureType.java | 4 +- .../metadata/datatype/BigDecimalSerializer.java | 2 +- .../metadata/datatype/BooleanSerializer.java | 9 +- .../metadata/datatype/DataTypeSerializer.java | 17 +- .../metadata/datatype/DateTimeSerializer.java | 9 +- .../metadata/datatype/DoubleSerializer.java | 5 +- .../kylin/metadata/datatype/Int4Serializer.java | 9 +- .../metadata/datatype/Long8Serializer.java | 9 +- .../kylin/metadata/datatype/LongSerializer.java | 9 +- .../kylin/metadata/model/FunctionDesc.java | 22 +- .../apache/kylin/metadata/model/JoinDesc.java | 7 +- .../kylin/metadata/model/JoinTableDesc.java | 4 +- .../apache/kylin/metadata/model/JoinsTree.java | 9 +- .../kylin/metadata/model/MeasureDesc.java | 9 +- .../metadata/model/ModelDimensionDesc.java | 10 +- .../kylin/metadata/model/ParameterDesc.java | 13 +- .../kylin/metadata/model/PartitionDesc.java | 6 +- .../apache/kylin/metadata/model/TableRef.java | 17 +- .../kylin/engine/mr/BatchCubingJobBuilder2.java | 9 +- .../kylin/engine/mr/JobBuilderSupport.java | 12 +- .../engine/mr/common/BaseCuboidBuilder.java | 175 +++++++++ .../kylin/engine/mr/common/NDCuboidBuilder.java | 96 +++++ .../engine/mr/steps/BaseCuboidMapperBase.java | 131 +------ .../kylin/engine/mr/steps/NDCuboidMapper.java | 58 +-- engine-spark/pom.xml | 5 + .../engine/spark/SparkBatchCubingEngine2.java | 33 ++ .../spark/SparkBatchCubingJobBuilder2.java | 85 +++++ .../apache/kylin/engine/spark/SparkCubing.java | 79 ++++- .../kylin/engine/spark/SparkCubingV3.java | 354 +++++++++++++++++++ .../kylin/engine/spark/SparkExecutable.java | 26 +- .../sandbox/kylin-spark-conf.properties | 27 ++ .../test_case_data/sandbox/kylin.properties | 11 +- pom.xml | 2 +- .../kylin/rest/controller/CubeController.java | 8 +- server/pom.xml | 7 + .../apache/kylin/source/hive/HiveMRInput.java | 5 +- tool/pom.xml | 4 + 91 files changed, 1299 insertions(+), 462 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/78e6cd5b/assembly/pom.xml ---------------------------------------------------------------------- diff --git a/assembly/pom.xml b/assembly/pom.xml index 73f9e12..fc49a62 100644 --- a/assembly/pom.xml +++ b/assembly/pom.xml @@ -47,6 +47,10 @@ <groupId>org.apache.kylin</groupId> <artifactId>kylin-engine-mr</artifactId> </dependency> + <dependency> + <groupId>org.apache.kylin</groupId> + <artifactId>kylin-engine-spark</artifactId> + </dependency> <!-- Env & Test --> <dependency> @@ -173,7 +177,6 @@ <shadedClassifierName>job</shadedClassifierName> <artifactSet> <excludes> - <exclude>io.netty:*</exclude> <exclude>org.apache.zookeeper:*</exclude> <exclude>net.sf.ehcache:*</exclude> <exclude>org.apache.httpcomponents:*</exclude> http://git-wip-us.apache.org/repos/asf/kylin/blob/78e6cd5b/assembly/src/main/config/assemblies/source-assembly.xml ---------------------------------------------------------------------- diff --git a/assembly/src/main/config/assemblies/source-assembly.xml b/assembly/src/main/config/assemblies/source-assembly.xml index fad45aa..92584b5 100644 --- a/assembly/src/main/config/assemblies/source-assembly.xml +++ b/assembly/src/main/config/assemblies/source-assembly.xml @@ -97,9 +97,6 @@ limitations under the License. <exclude>%regex[(?!((?!${project.build.directory}/)[^/]+/)*src/)(.*/)?docs(/.*)?] </exclude> - <!-- exclude unmaintained --> - <exclude>%regex[(?!((?!${project.build.directory}/)))?engine-spark(/.*)?] - </exclude> </excludes> </fileSet> <!-- LICENSE, NOTICE, DEPENDENCIES, git.properties, etc. calculated at build time --> http://git-wip-us.apache.org/repos/asf/kylin/blob/78e6cd5b/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java ---------------------------------------------------------------------- diff --git a/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java b/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java index 089c3ed..d1abfeb 100644 --- a/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java +++ b/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java @@ -82,7 +82,6 @@ public class DeployUtil { config().overrideMRJobJarPath(jobJar.getAbsolutePath()); config().overrideCoprocessorLocalJar(coprocessorJar.getAbsolutePath()); - config().overrideSparkJobJarPath(getSparkJobJarFile().getAbsolutePath()); } private static String getPomVersion() { http://git-wip-us.apache.org/repos/asf/kylin/blob/78e6cd5b/build/conf/kylin-spark-conf.properties ---------------------------------------------------------------------- diff --git a/build/conf/kylin-spark-conf.properties b/build/conf/kylin-spark-conf.properties new file mode 100644 index 0000000..7f53f50 --- /dev/null +++ b/build/conf/kylin-spark-conf.properties @@ -0,0 +1,27 @@ +spark.yarn.submit.file.replication=1 +spark.yarn.executor.memoryOverhead=200 +spark.yarn.driver.memoryOverhead=384 +#spark.master=local[4] +spark.master=yarn +spark.submit.deployMode=cluster +spark.eventLog.enabled=true +spark.yarn.scheduler.heartbeat.interval-ms=5000 +spark.yarn.preserve.staging.files=true +spark.yarn.queue=default +spark.yarn.containerLauncherMaxThreads=25 +spark.yarn.max.executor.failures=3 +spark.eventLog.dir=hdfs\:///kylin/spark-history +spark.history.kerberos.enabled=true +spark.history.provider=org.apache.spark.deploy.history.FsHistoryProvider +spark.history.ui.port=18080 +spark.history.fs.logDirectory=hdfs\:///kylin/spark-history +spark.executor.memory=1G +spark.storage.memoryFraction=0.3 +spark.executor.cores=1 +spark.executor.instances=1 +spark.history.kerberos.keytab=none +spark.history.kerberos.principal=none +#spark.yarn.jar=hdfs://sandbox.hortonworks.com:8020/apps/spark/spark-assembly-1.6.3-hadoop2.6.0.jar +spark.driver.extraJavaOptions=-Dhdp.version=current +spark.yarn.am.extraJavaOptions=-Dhdp.version=current +spark.executor.extraJavaOptions=-Dhdp.version=current http://git-wip-us.apache.org/repos/asf/kylin/blob/78e6cd5b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index da93388..c32550f 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -636,6 +636,7 @@ abstract public class KylinConfigBase implements Serializable { // ref constants in IEngineAware r.put(0, "org.apache.kylin.engine.mr.MRBatchCubingEngine"); r.put(2, "org.apache.kylin.engine.mr.MRBatchCubingEngine2"); + r.put(4, "org.apache.kylin.engine.spark.SparkBatchCubingEngine2"); return r; } @@ -712,29 +713,32 @@ abstract public class KylinConfigBase implements Serializable { // ENGINE.SPARK // ============================================================================ - public String getKylinSparkJobJarPath() { - final String jobJar = getOptional("kylin.engine.spark.job-jar"); - if (StringUtils.isNotEmpty(jobJar)) { - return jobJar; - } - String kylinHome = getKylinHome(); - if (StringUtils.isEmpty(kylinHome)) { - return ""; - } - return getFileName(kylinHome + File.separator + "lib", SPARK_JOB_JAR_NAME_PATTERN); + public String getSparkHome() { + return getRequired("kylin.engine.spark.spark-home"); } - public void overrideSparkJobJarPath(String path) { - logger.info("override " + "kylin.engine.spark.job-jar" + " to " + path); - System.setProperty("kylin.engine.spark.job-jar", path); + public String getSparkHadoopConfDir() { + return getRequired("kylin.engine.spark.env.hadoop-conf-dir"); } - public String getSparkHome() { - return getRequired("kylin.engine.spark.spark-home"); + public String getSparkConfFile() { + String conf = getOptional("kylin.engine.spark.properties-file", "conf/kylin-spark-conf.properties"); + File f = new File(conf); + if (f.exists()) { + return f.getAbsolutePath(); + } else { + String home = getKylinHome(); + f = new File(home, conf); + if (f.exists()) { + return f.getAbsolutePath(); + } + } + + throw new IllegalArgumentException("Spark conf file '" + conf + "' does not exist."); } - public String getSparkMaster() { - return getRequired("kylin.engine.spark.spark-master"); + public String getSparkAdditionalJars() { + return getOptional("kylin.engine.spark.additional-jars", ""); } // ============================================================================ http://git-wip-us.apache.org/repos/asf/kylin/blob/78e6cd5b/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java b/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java index 0580576..9549569 100644 --- a/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java +++ b/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java @@ -21,17 +21,21 @@ package org.apache.kylin.common.persistence; import java.io.ByteArrayOutputStream; import java.io.DataInputStream; import java.io.DataOutputStream; +import java.io.File; import java.io.IOException; import java.io.InputStream; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.NavigableSet; import java.util.concurrent.ConcurrentHashMap; +import org.apache.commons.io.FileUtils; import org.apache.commons.io.IOUtils; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.ClassUtil; +import org.apache.kylin.common.util.OptionsHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -311,4 +315,35 @@ abstract public class ResourceStore { return collector; } + public static String dumpResources(KylinConfig kylinConfig, Collection<String> dumpList) throws IOException { + File tmp = File.createTempFile("kylin_job_meta", ""); + FileUtils.forceDelete(tmp); // we need a directory, so delete the file first + + File metaDir = new File(tmp, "meta"); + metaDir.mkdirs(); + + // write kylin.properties + File kylinPropsFile = new File(metaDir, "kylin.properties"); + kylinConfig.writeProperties(kylinPropsFile); + + ResourceStore from = ResourceStore.getStore(kylinConfig); + KylinConfig localConfig = KylinConfig.createInstanceFromUri(metaDir.getAbsolutePath()); + ResourceStore to = ResourceStore.getStore(localConfig); + for (String path : dumpList) { + RawResource res = from.getResource(path); + if (res == null) + throw new IllegalStateException("No resource found at -- " + path); + to.putResource(path, res.inputStream, res.timestamp); + res.inputStream.close(); + } + + String metaDirURI = OptionsHelper.convertToFileURL(metaDir.getAbsolutePath()); + if (metaDirURI.startsWith("/")) // note Path on windows is like "d:/../..." + metaDirURI = "file://" + metaDirURI; + else + metaDirURI = "file:///" + metaDirURI; + logger.info("meta dir is: " + metaDirURI); + + return metaDirURI; + } } http://git-wip-us.apache.org/repos/asf/kylin/blob/78e6cd5b/core-common/src/main/java/org/apache/kylin/common/util/Array.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/util/Array.java b/core-common/src/main/java/org/apache/kylin/common/util/Array.java index 7447b46..b25b764 100644 --- a/core-common/src/main/java/org/apache/kylin/common/util/Array.java +++ b/core-common/src/main/java/org/apache/kylin/common/util/Array.java @@ -23,7 +23,7 @@ import java.util.Arrays; /* * An array with correct equals(), hashCode(), compareTo() and toString() */ -public class Array<T> implements Comparable<Array<T>> { +public class Array<T> implements Comparable<Array<T>>, java.io.Serializable { public T[] data; public Array(T[] data) { http://git-wip-us.apache.org/repos/asf/kylin/blob/78e6cd5b/core-common/src/main/java/org/apache/kylin/common/util/SplittedBytes.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/util/SplittedBytes.java b/core-common/src/main/java/org/apache/kylin/common/util/SplittedBytes.java index 8751b78..ae380cb 100644 --- a/core-common/src/main/java/org/apache/kylin/common/util/SplittedBytes.java +++ b/core-common/src/main/java/org/apache/kylin/common/util/SplittedBytes.java @@ -22,7 +22,7 @@ package org.apache.kylin.common.util; * @author George Song (ysong1) * */ -public class SplittedBytes { +public class SplittedBytes implements java.io.Serializable { public SplittedBytes(int length) { this.value = new byte[length]; this.length = 0; http://git-wip-us.apache.org/repos/asf/kylin/blob/78e6cd5b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java index 36a6044..45310f0 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java @@ -51,7 +51,7 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; @JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE) -public class CubeSegment implements Comparable<CubeSegment>, IBuildable, ISegment { +public class CubeSegment implements Comparable<CubeSegment>, IBuildable, ISegment, java.io.Serializable { @JsonBackReference private CubeInstance cubeInstance; http://git-wip-us.apache.org/repos/asf/kylin/blob/78e6cd5b/core-cube/src/main/java/org/apache/kylin/cube/common/RowKeySplitter.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/common/RowKeySplitter.java b/core-cube/src/main/java/org/apache/kylin/cube/common/RowKeySplitter.java index 67f1751..3602fff 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/common/RowKeySplitter.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/common/RowKeySplitter.java @@ -26,9 +26,11 @@ import org.apache.kylin.cube.kv.CubeDimEncMap; import org.apache.kylin.cube.kv.RowConstants; import org.apache.kylin.cube.kv.RowKeyColumnIO; import org.apache.kylin.cube.model.CubeDesc; +import org.apache.kylin.cube.model.RowKeyColDesc; +import org.apache.kylin.dimension.IDimensionEncodingMap; import org.apache.kylin.metadata.model.TblColRef; -public class RowKeySplitter { +public class RowKeySplitter implements java.io.Serializable { private CubeDesc cubeDesc; private RowKeyColumnIO colIO; @@ -64,7 +66,13 @@ public class RowKeySplitter { public RowKeySplitter(CubeSegment cubeSeg, int splitLen, int bytesLen) { this.enableSharding = cubeSeg.isEnableSharding(); this.cubeDesc = cubeSeg.getCubeDesc(); - this.colIO = new RowKeyColumnIO(new CubeDimEncMap(cubeSeg)); + IDimensionEncodingMap dimEncoding = new CubeDimEncMap(cubeSeg); + + for (RowKeyColDesc rowKeyColDesc : cubeDesc.getRowkey().getRowKeyColumns()) { + dimEncoding.get(rowKeyColDesc.getColRef()); + } + + this.colIO = new RowKeyColumnIO(dimEncoding); this.splitBuffers = new SplittedBytes[splitLen]; this.splitOffsets = new int[splitLen]; http://git-wip-us.apache.org/repos/asf/kylin/blob/78e6cd5b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/Cuboid.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/cuboid/Cuboid.java b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/Cuboid.java index 7503fbf..dd22d6a 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/cuboid/Cuboid.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/Cuboid.java @@ -18,15 +18,10 @@ package org.apache.kylin.cube.cuboid; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.Comparator; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; - +import com.google.common.base.Function; +import com.google.common.collect.Collections2; +import com.google.common.collect.ComparisonChain; +import com.google.common.collect.Lists; import org.apache.commons.lang.StringUtils; import org.apache.kylin.common.util.Bytes; import org.apache.kylin.cube.gridtable.CuboidToGridTableMapping; @@ -37,12 +32,16 @@ import org.apache.kylin.cube.model.RowKeyColDesc; import org.apache.kylin.metadata.model.FunctionDesc; import org.apache.kylin.metadata.model.TblColRef; -import com.google.common.base.Function; -import com.google.common.collect.Collections2; -import com.google.common.collect.ComparisonChain; -import com.google.common.collect.Lists; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; -public class Cuboid implements Comparable<Cuboid> { +public class Cuboid implements Comparable<Cuboid>, java.io.Serializable { private final static Map<String, Map<Long, Cuboid>> CUBOID_CACHE = new ConcurrentHashMap<String, Map<Long, Cuboid>>(); http://git-wip-us.apache.org/repos/asf/kylin/blob/78e6cd5b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidScheduler.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidScheduler.java b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidScheduler.java index 733aded..ffb0a5e 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidScheduler.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidScheduler.java @@ -21,6 +21,12 @@ package org.apache.kylin.cube.cuboid; /** */ +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import org.apache.kylin.cube.model.AggregationGroup; +import org.apache.kylin.cube.model.CubeDesc; + import java.util.Collections; import java.util.HashSet; import java.util.List; @@ -28,14 +34,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; -import org.apache.kylin.cube.model.AggregationGroup; -import org.apache.kylin.cube.model.CubeDesc; - -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; -import com.google.common.collect.Sets; - -public class CuboidScheduler { +public class CuboidScheduler implements java.io.Serializable { private final CubeDesc cubeDesc; private final long max; http://git-wip-us.apache.org/repos/asf/kylin/blob/78e6cd5b/core-cube/src/main/java/org/apache/kylin/cube/kv/AbstractRowKeyEncoder.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/kv/AbstractRowKeyEncoder.java b/core-cube/src/main/java/org/apache/kylin/cube/kv/AbstractRowKeyEncoder.java index bfe6eb4..2becde4 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/kv/AbstractRowKeyEncoder.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/kv/AbstractRowKeyEncoder.java @@ -18,8 +18,6 @@ package org.apache.kylin.cube.kv; -import java.util.Map; - import org.apache.kylin.common.util.ByteArray; import org.apache.kylin.common.util.ImmutableBitSet; import org.apache.kylin.cube.CubeSegment; @@ -30,12 +28,14 @@ import org.apache.kylin.metadata.model.TblColRef; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Map; + /** * * @author xjiang * */ -public abstract class AbstractRowKeyEncoder { +public abstract class AbstractRowKeyEncoder implements java.io.Serializable { protected static final Logger logger = LoggerFactory.getLogger(AbstractRowKeyEncoder.class); public static final byte DEFAULT_BLANK_BYTE = DimensionEncoding.NULL; http://git-wip-us.apache.org/repos/asf/kylin/blob/78e6cd5b/core-cube/src/main/java/org/apache/kylin/cube/kv/CubeDimEncMap.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/kv/CubeDimEncMap.java b/core-cube/src/main/java/org/apache/kylin/cube/kv/CubeDimEncMap.java index a4d2d6f..bd9554a 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/kv/CubeDimEncMap.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/kv/CubeDimEncMap.java @@ -18,8 +18,7 @@ package org.apache.kylin.cube.kv; -import java.util.Map; - +import com.google.common.collect.Maps; import org.apache.kylin.common.util.Dictionary; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.cube.model.CubeDesc; @@ -33,9 +32,9 @@ import org.apache.kylin.metadata.model.TblColRef; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.collect.Maps; +import java.util.Map; -public class CubeDimEncMap implements IDimensionEncodingMap { +public class CubeDimEncMap implements IDimensionEncodingMap, java.io.Serializable { private static final Logger logger = LoggerFactory.getLogger(CubeDimEncMap.class); http://git-wip-us.apache.org/repos/asf/kylin/blob/78e6cd5b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyColumnIO.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyColumnIO.java b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyColumnIO.java index fbb93db..65911a0 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyColumnIO.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyColumnIO.java @@ -29,7 +29,7 @@ import org.apache.kylin.metadata.model.TblColRef; * * @author yangli9 */ -public class RowKeyColumnIO { +public class RowKeyColumnIO implements java.io.Serializable { //private static final Logger logger = LoggerFactory.getLogger(RowKeyColumnIO.class); http://git-wip-us.apache.org/repos/asf/kylin/blob/78e6cd5b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyEncoder.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyEncoder.java b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyEncoder.java index bf20de1..a669fb1 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyEncoder.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyEncoder.java @@ -18,11 +18,7 @@ package org.apache.kylin.cube.kv; -import java.util.Arrays; -import java.util.List; -import java.util.Map; -import java.util.Set; - +import com.google.common.base.Preconditions; import org.apache.kylin.common.util.ByteArray; import org.apache.kylin.common.util.BytesUtil; import org.apache.kylin.common.util.ImmutableBitSet; @@ -32,9 +28,12 @@ import org.apache.kylin.cube.cuboid.Cuboid; import org.apache.kylin.gridtable.GTRecord; import org.apache.kylin.metadata.model.TblColRef; -import com.google.common.base.Preconditions; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Set; -public class RowKeyEncoder extends AbstractRowKeyEncoder { +public class RowKeyEncoder extends AbstractRowKeyEncoder implements java.io.Serializable { private int bodyLength = 0; private RowKeyColumnIO colIO; @@ -42,11 +41,13 @@ public class RowKeyEncoder extends AbstractRowKeyEncoder { protected boolean enableSharding; private int uhcOffset = -1;//it's a offset to the beginning of body private int uhcLength = -1; + private int headerLength; public RowKeyEncoder(CubeSegment cubeSeg, Cuboid cuboid) { super(cubeSeg, cuboid); enableSharding = cubeSeg.isEnableSharding(); - Set<TblColRef> shardByColumns = cubeSeg.getShardByColumns(); + headerLength = cubeSeg.getRowKeyPreambleSize(); + Set<TblColRef> shardByColumns = cubeSeg.getCubeDesc().getShardByColumns(); if (shardByColumns.size() > 1) { throw new IllegalStateException("Does not support multiple UHC now"); } @@ -61,7 +62,7 @@ public class RowKeyEncoder extends AbstractRowKeyEncoder { } public int getHeaderLength() { - return cubeSeg.getRowKeyPreambleSize(); + return headerLength; } public int getBytesLength() { http://git-wip-us.apache.org/repos/asf/kylin/blob/78e6cd5b/core-cube/src/main/java/org/apache/kylin/cube/model/AggregationGroup.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/AggregationGroup.java b/core-cube/src/main/java/org/apache/kylin/cube/model/AggregationGroup.java index 9bd082f..5f3c92c 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/model/AggregationGroup.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/model/AggregationGroup.java @@ -35,8 +35,8 @@ import com.google.common.base.Preconditions; import com.google.common.collect.Lists; @JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE) -public class AggregationGroup { - public static class HierarchyMask { +public class AggregationGroup implements java.io.Serializable { + public static class HierarchyMask implements java.io.Serializable { public long fullMask; // 00000111 public long[] allMasks; // 00000100,00000110,00000111 public long[] dims; // 00000100,00000010,00000001 http://git-wip-us.apache.org/repos/asf/kylin/blob/78e6cd5b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java index 3b8d034..c9ad8bc 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java @@ -100,7 +100,7 @@ public class CubeDesc extends RootPersistentEntity implements IEngineAware { LOOKUP, PK_FK, EXTENDED_COLUMN } - public static class DeriveInfo { + public static class DeriveInfo implements java.io.Serializable { public DeriveType type; public JoinDesc join; public TblColRef[] columns; http://git-wip-us.apache.org/repos/asf/kylin/blob/78e6cd5b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java index 94e1a7c..2a68116 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java @@ -36,7 +36,7 @@ import com.google.common.collect.Maps; /** */ -public class CubeJoinedFlatTableDesc implements IJoinedFlatTableDesc { +public class CubeJoinedFlatTableDesc implements IJoinedFlatTableDesc, java.io.Serializable { protected final String tableName; protected final CubeDesc cubeDesc; http://git-wip-us.apache.org/repos/asf/kylin/blob/78e6cd5b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableEnrich.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableEnrich.java b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableEnrich.java index a1312b5..e829aeb 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableEnrich.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableEnrich.java @@ -18,8 +18,6 @@ package org.apache.kylin.cube.model; -import java.util.List; - import org.apache.kylin.cube.cuboid.Cuboid; import org.apache.kylin.metadata.model.DataModelDesc; import org.apache.kylin.metadata.model.FunctionDesc; @@ -28,10 +26,12 @@ import org.apache.kylin.metadata.model.ISegment; import org.apache.kylin.metadata.model.MeasureDesc; import org.apache.kylin.metadata.model.TblColRef; +import java.util.List; + /** * An enrich of IJoinedFlatTableDesc for cubes */ -public class CubeJoinedFlatTableEnrich implements IJoinedFlatTableDesc { +public class CubeJoinedFlatTableEnrich implements IJoinedFlatTableDesc, java.io.Serializable { private CubeDesc cubeDesc; private IJoinedFlatTableDesc flatDesc; http://git-wip-us.apache.org/repos/asf/kylin/blob/78e6cd5b/core-cube/src/main/java/org/apache/kylin/cube/model/DictionaryDesc.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/DictionaryDesc.java b/core-cube/src/main/java/org/apache/kylin/cube/model/DictionaryDesc.java index f471f9f..ca2183a 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/model/DictionaryDesc.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/model/DictionaryDesc.java @@ -27,7 +27,7 @@ import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonProperty; @JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE) -public class DictionaryDesc { +public class DictionaryDesc implements java.io.Serializable { @JsonProperty("column") private String column; http://git-wip-us.apache.org/repos/asf/kylin/blob/78e6cd5b/core-cube/src/main/java/org/apache/kylin/cube/model/DimensionDesc.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/DimensionDesc.java b/core-cube/src/main/java/org/apache/kylin/cube/model/DimensionDesc.java index cd75228..93bf4bd 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/model/DimensionDesc.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/model/DimensionDesc.java @@ -35,7 +35,7 @@ import com.google.common.base.Objects; /** */ @JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE) -public class DimensionDesc { +public class DimensionDesc implements java.io.Serializable { @JsonProperty("name") private String name; http://git-wip-us.apache.org/repos/asf/kylin/blob/78e6cd5b/core-cube/src/main/java/org/apache/kylin/cube/model/HBaseColumnDesc.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/HBaseColumnDesc.java b/core-cube/src/main/java/org/apache/kylin/cube/model/HBaseColumnDesc.java index fb491f8..7007342 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/model/HBaseColumnDesc.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/model/HBaseColumnDesc.java @@ -18,19 +18,18 @@ package org.apache.kylin.cube.model; -import java.util.Arrays; - -import org.apache.kylin.metadata.model.FunctionDesc; -import org.apache.kylin.metadata.model.MeasureDesc; - import com.fasterxml.jackson.annotation.JsonAutoDetect; import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility; import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.kylin.metadata.model.FunctionDesc; +import org.apache.kylin.metadata.model.MeasureDesc; + +import java.util.Arrays; /** */ @JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE) -public class HBaseColumnDesc { +public class HBaseColumnDesc implements java.io.Serializable { @JsonProperty("qualifier") private String qualifier; http://git-wip-us.apache.org/repos/asf/kylin/blob/78e6cd5b/core-cube/src/main/java/org/apache/kylin/cube/model/HBaseColumnFamilyDesc.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/HBaseColumnFamilyDesc.java b/core-cube/src/main/java/org/apache/kylin/cube/model/HBaseColumnFamilyDesc.java index c5b2e19..85c2c17 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/model/HBaseColumnFamilyDesc.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/model/HBaseColumnFamilyDesc.java @@ -18,18 +18,17 @@ package org.apache.kylin.cube.model; -import java.util.Arrays; - -import org.apache.kylin.metadata.model.MeasureDesc; - import com.fasterxml.jackson.annotation.JsonAutoDetect; import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility; import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.kylin.metadata.model.MeasureDesc; + +import java.util.Arrays; /** */ @JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE) -public class HBaseColumnFamilyDesc { +public class HBaseColumnFamilyDesc implements java.io.Serializable { @JsonProperty("name") private String name; http://git-wip-us.apache.org/repos/asf/kylin/blob/78e6cd5b/core-cube/src/main/java/org/apache/kylin/cube/model/HBaseMappingDesc.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/HBaseMappingDesc.java b/core-cube/src/main/java/org/apache/kylin/cube/model/HBaseMappingDesc.java index 2ef1e17..9ad8407 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/model/HBaseMappingDesc.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/model/HBaseMappingDesc.java @@ -18,22 +18,21 @@ package org.apache.kylin.cube.model; -import java.util.Arrays; -import java.util.Collection; -import java.util.LinkedList; - +import com.fasterxml.jackson.annotation.JsonAutoDetect; +import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility; +import com.fasterxml.jackson.annotation.JsonProperty; import org.apache.kylin.common.util.StringUtil; import org.apache.kylin.metadata.model.FunctionDesc; import org.apache.kylin.metadata.model.MeasureDesc; -import com.fasterxml.jackson.annotation.JsonAutoDetect; -import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility; -import com.fasterxml.jackson.annotation.JsonProperty; +import java.util.Arrays; +import java.util.Collection; +import java.util.LinkedList; /** */ @JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE) -public class HBaseMappingDesc { +public class HBaseMappingDesc implements java.io.Serializable { @JsonProperty("column_family") private HBaseColumnFamilyDesc[] columnFamily; http://git-wip-us.apache.org/repos/asf/kylin/blob/78e6cd5b/core-cube/src/main/java/org/apache/kylin/cube/model/HierarchyDesc.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/HierarchyDesc.java b/core-cube/src/main/java/org/apache/kylin/cube/model/HierarchyDesc.java index f88d4d2..b72f220 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/model/HierarchyDesc.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/model/HierarchyDesc.java @@ -27,7 +27,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; /** */ @JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE) -public class HierarchyDesc { +public class HierarchyDesc implements java.io.Serializable { @JsonProperty("level") private String level; http://git-wip-us.apache.org/repos/asf/kylin/blob/78e6cd5b/core-cube/src/main/java/org/apache/kylin/cube/model/RowKeyColDesc.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/RowKeyColDesc.java b/core-cube/src/main/java/org/apache/kylin/cube/model/RowKeyColDesc.java index ef34a9b..71fd4b9 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/model/RowKeyColDesc.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/model/RowKeyColDesc.java @@ -40,7 +40,7 @@ import com.google.common.base.Preconditions; * */ @JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE) -public class RowKeyColDesc { +public class RowKeyColDesc implements java.io.Serializable { @JsonProperty("column") private String column; http://git-wip-us.apache.org/repos/asf/kylin/blob/78e6cd5b/core-cube/src/main/java/org/apache/kylin/cube/model/RowKeyDesc.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/RowKeyDesc.java b/core-cube/src/main/java/org/apache/kylin/cube/model/RowKeyDesc.java index f1a403d..00557c5 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/model/RowKeyDesc.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/model/RowKeyDesc.java @@ -18,24 +18,23 @@ package org.apache.kylin.cube.model; +import com.fasterxml.jackson.annotation.JsonAutoDetect; +import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Objects; +import org.apache.commons.lang.ArrayUtils; +import org.apache.kylin.metadata.model.TblColRef; + import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set; -import org.apache.commons.lang.ArrayUtils; -import org.apache.kylin.metadata.model.TblColRef; - -import com.fasterxml.jackson.annotation.JsonAutoDetect; -import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility; -import com.fasterxml.jackson.annotation.JsonProperty; -import com.google.common.base.Objects; - /** */ @JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE) -public class RowKeyDesc { +public class RowKeyDesc implements java.io.Serializable { @JsonProperty("rowkey_columns") private RowKeyColDesc[] rowkeyColumns; http://git-wip-us.apache.org/repos/asf/kylin/blob/78e6cd5b/core-cube/src/main/java/org/apache/kylin/cube/model/SelectRule.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/SelectRule.java b/core-cube/src/main/java/org/apache/kylin/cube/model/SelectRule.java index 63b0fc4..4a6c510 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/model/SelectRule.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/model/SelectRule.java @@ -20,7 +20,7 @@ package org.apache.kylin.cube.model; import com.fasterxml.jackson.annotation.JsonProperty; -public class SelectRule { +public class SelectRule implements java.io.Serializable { @JsonProperty("hierarchy_dims") public String[][] hierarchy_dims; @JsonProperty("mandatory_dims") http://git-wip-us.apache.org/repos/asf/kylin/blob/78e6cd5b/core-dictionary/src/main/java/org/apache/kylin/dict/NumberDictionaryForestBuilder.java ---------------------------------------------------------------------- diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/NumberDictionaryForestBuilder.java b/core-dictionary/src/main/java/org/apache/kylin/dict/NumberDictionaryForestBuilder.java index 5502a74..60c9f59 100644 --- a/core-dictionary/src/main/java/org/apache/kylin/dict/NumberDictionaryForestBuilder.java +++ b/core-dictionary/src/main/java/org/apache/kylin/dict/NumberDictionaryForestBuilder.java @@ -26,7 +26,7 @@ import org.apache.kylin.dict.NumberDictionary.NumberBytesCodec; */ public class NumberDictionaryForestBuilder extends TrieDictionaryForestBuilder<String> { - public static class Number2BytesConverter implements BytesConverter<String> { + public static class Number2BytesConverter implements BytesConverter<String>, java.io.Serializable { static final int MAX_DIGITS_BEFORE_DECIMAL_POINT = NumberDictionary.MAX_DIGITS_BEFORE_DECIMAL_POINT; static final ThreadLocal<NumberBytesCodec> LOCAL = new ThreadLocal<NumberBytesCodec>(); http://git-wip-us.apache.org/repos/asf/kylin/blob/78e6cd5b/core-dictionary/src/main/java/org/apache/kylin/dict/StringBytesConverter.java ---------------------------------------------------------------------- diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/StringBytesConverter.java b/core-dictionary/src/main/java/org/apache/kylin/dict/StringBytesConverter.java index 0bec6a1..9107a4c 100644 --- a/core-dictionary/src/main/java/org/apache/kylin/dict/StringBytesConverter.java +++ b/core-dictionary/src/main/java/org/apache/kylin/dict/StringBytesConverter.java @@ -20,7 +20,7 @@ package org.apache.kylin.dict; import org.apache.kylin.common.util.Bytes; -public class StringBytesConverter implements BytesConverter<String> { +public class StringBytesConverter implements BytesConverter<String>, java.io.Serializable { @Override public byte[] convertToBytes(String v) { http://git-wip-us.apache.org/repos/asf/kylin/blob/78e6cd5b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java ---------------------------------------------------------------------- diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java index 0273fd8..466cdad 100644 --- a/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java +++ b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java @@ -18,12 +18,9 @@ package org.apache.kylin.job.execution; -import java.lang.reflect.Constructor; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.ClassUtil; import org.apache.kylin.job.dao.ExecutableDao; @@ -34,9 +31,11 @@ import org.apache.kylin.job.exception.PersistentException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; +import java.lang.reflect.Constructor; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; /** */ @@ -352,6 +351,10 @@ public class ExecutableManager { } } } + + if (job.getStatus() == ExecutableState.SUCCEED) { + updateJobOutput(job.getId(), ExecutableState.READY, null, null); + } } public void pauseJob(String jobId) { http://git-wip-us.apache.org/repos/asf/kylin/blob/78e6cd5b/core-metadata/src/main/java/org/apache/kylin/dimension/AbstractDateDimEnc.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/dimension/AbstractDateDimEnc.java b/core-metadata/src/main/java/org/apache/kylin/dimension/AbstractDateDimEnc.java index ec6347f..a54bcda 100644 --- a/core-metadata/src/main/java/org/apache/kylin/dimension/AbstractDateDimEnc.java +++ b/core-metadata/src/main/java/org/apache/kylin/dimension/AbstractDateDimEnc.java @@ -18,6 +18,10 @@ package org.apache.kylin.dimension; +import org.apache.kylin.common.util.BytesUtil; +import org.apache.kylin.common.util.DateFormat; +import org.apache.kylin.metadata.datatype.DataTypeSerializer; + import java.io.IOException; import java.io.ObjectInput; import java.io.ObjectOutput; @@ -25,10 +29,6 @@ import java.io.Serializable; import java.nio.ByteBuffer; import java.util.Arrays; -import org.apache.kylin.common.util.BytesUtil; -import org.apache.kylin.common.util.DateFormat; -import org.apache.kylin.metadata.datatype.DataTypeSerializer; - public class AbstractDateDimEnc extends DimensionEncoding { private static final long serialVersionUID = 1L; @@ -81,11 +81,9 @@ public class AbstractDateDimEnc extends DimensionEncoding { @Override public DataTypeSerializer<Object> asDataTypeSerializer() { return new DataTypeSerializer<Object>() { - // be thread-safe and avoid repeated obj creation - private ThreadLocal<byte[]> current = new ThreadLocal<byte[]>(); private byte[] currentBuf() { - byte[] buf = current.get(); + byte[] buf = (byte[]) current.get(); if (buf == null) { buf = new byte[fixedLen]; current.set(buf); http://git-wip-us.apache.org/repos/asf/kylin/blob/78e6cd5b/core-metadata/src/main/java/org/apache/kylin/dimension/BooleanDimEnc.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/dimension/BooleanDimEnc.java b/core-metadata/src/main/java/org/apache/kylin/dimension/BooleanDimEnc.java index c3f4c11..75e50a1 100644 --- a/core-metadata/src/main/java/org/apache/kylin/dimension/BooleanDimEnc.java +++ b/core-metadata/src/main/java/org/apache/kylin/dimension/BooleanDimEnc.java @@ -109,11 +109,9 @@ public class BooleanDimEnc extends DimensionEncoding { } private class BooleanSerializer extends DataTypeSerializer<Object> { - // be thread-safe and avoid repeated obj creation - private ThreadLocal<byte[]> current = new ThreadLocal<byte[]>(); private byte[] currentBuf() { - byte[] buf = current.get(); + byte[] buf = (byte[]) current.get(); if (buf == null) { buf = new byte[fixedLen]; current.set(buf); http://git-wip-us.apache.org/repos/asf/kylin/blob/78e6cd5b/core-metadata/src/main/java/org/apache/kylin/dimension/DictionaryDimEnc.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/dimension/DictionaryDimEnc.java b/core-metadata/src/main/java/org/apache/kylin/dimension/DictionaryDimEnc.java index 500b410..48238dc 100644 --- a/core-metadata/src/main/java/org/apache/kylin/dimension/DictionaryDimEnc.java +++ b/core-metadata/src/main/java/org/apache/kylin/dimension/DictionaryDimEnc.java @@ -39,12 +39,15 @@ public class DictionaryDimEnc extends DimensionEncoding { // ============================================================================ // could use a lazy loading trick here, to prevent loading all dictionaries of a segment at once - private final Dictionary<String> dict; - private final int fixedLen; + private Dictionary<String> dict; + private int fixedLen; // used in encode(), when a value does not exist in dictionary - private final int roundingFlag; - private final byte defaultByte; + private int roundingFlag; + private byte defaultByte; + + public DictionaryDimEnc() { + } public DictionaryDimEnc(Dictionary<String> dict) { this(dict, 0, NULL); @@ -145,12 +148,18 @@ public class DictionaryDimEnc extends DimensionEncoding { @Override public void writeExternal(ObjectOutput out) throws IOException { - throw new UnsupportedOperationException(); + out.writeInt(fixedLen); + out.writeInt(roundingFlag); + out.write(defaultByte); + out.writeObject(dict); } @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - throw new UnsupportedOperationException(); + this.fixedLen = in.readInt(); + this.roundingFlag = in.readInt(); + this.defaultByte = in.readByte(); + this.dict = (Dictionary<String>) in.readObject(); } } http://git-wip-us.apache.org/repos/asf/kylin/blob/78e6cd5b/core-metadata/src/main/java/org/apache/kylin/dimension/FixedLenDimEnc.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/dimension/FixedLenDimEnc.java b/core-metadata/src/main/java/org/apache/kylin/dimension/FixedLenDimEnc.java index b219766..f7f02a0 100644 --- a/core-metadata/src/main/java/org/apache/kylin/dimension/FixedLenDimEnc.java +++ b/core-metadata/src/main/java/org/apache/kylin/dimension/FixedLenDimEnc.java @@ -129,11 +129,9 @@ public class FixedLenDimEnc extends DimensionEncoding { } public class FixedLenSerializer extends DataTypeSerializer<Object> { - // be thread-safe and avoid repeated obj creation - private ThreadLocal<byte[]> current = new ThreadLocal<byte[]>(); private byte[] currentBuf() { - byte[] buf = current.get(); + byte[] buf = (byte[]) current.get(); if (buf == null) { buf = new byte[fixedLen]; current.set(buf); http://git-wip-us.apache.org/repos/asf/kylin/blob/78e6cd5b/core-metadata/src/main/java/org/apache/kylin/dimension/FixedLenHexDimEnc.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/dimension/FixedLenHexDimEnc.java b/core-metadata/src/main/java/org/apache/kylin/dimension/FixedLenHexDimEnc.java index 83118fc..f90a40e 100644 --- a/core-metadata/src/main/java/org/apache/kylin/dimension/FixedLenHexDimEnc.java +++ b/core-metadata/src/main/java/org/apache/kylin/dimension/FixedLenHexDimEnc.java @@ -224,11 +224,9 @@ public class FixedLenHexDimEnc extends DimensionEncoding { } public class FixedLenSerializer extends DataTypeSerializer<Object> { - // be thread-safe and avoid repeated obj creation - private ThreadLocal<byte[]> current = new ThreadLocal<byte[]>(); private byte[] currentBuf() { - byte[] buf = current.get(); + byte[] buf = (byte[]) current.get(); if (buf == null) { buf = new byte[bytelen]; current.set(buf); http://git-wip-us.apache.org/repos/asf/kylin/blob/78e6cd5b/core-metadata/src/main/java/org/apache/kylin/dimension/IntDimEnc.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/dimension/IntDimEnc.java b/core-metadata/src/main/java/org/apache/kylin/dimension/IntDimEnc.java index f25f2a6..aa954da 100644 --- a/core-metadata/src/main/java/org/apache/kylin/dimension/IntDimEnc.java +++ b/core-metadata/src/main/java/org/apache/kylin/dimension/IntDimEnc.java @@ -109,11 +109,9 @@ public class IntDimEnc extends DimensionEncoding { } public class IntegerSerializer extends DataTypeSerializer<Object> { - // be thread-safe and avoid repeated obj creation - private ThreadLocal<byte[]> current = new ThreadLocal<byte[]>(); private byte[] currentBuf() { - byte[] buf = current.get(); + byte[] buf = (byte[]) current.get(); if (buf == null) { buf = new byte[fixedLen]; current.set(buf); http://git-wip-us.apache.org/repos/asf/kylin/blob/78e6cd5b/core-metadata/src/main/java/org/apache/kylin/dimension/IntegerDimEnc.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/dimension/IntegerDimEnc.java b/core-metadata/src/main/java/org/apache/kylin/dimension/IntegerDimEnc.java index 090dc83..0875a7f 100644 --- a/core-metadata/src/main/java/org/apache/kylin/dimension/IntegerDimEnc.java +++ b/core-metadata/src/main/java/org/apache/kylin/dimension/IntegerDimEnc.java @@ -135,11 +135,9 @@ public class IntegerDimEnc extends DimensionEncoding { } public class IntegerSerializer extends DataTypeSerializer<Object> { - // be thread-safe and avoid repeated obj creation - private ThreadLocal<byte[]> current = new ThreadLocal<byte[]>(); private byte[] currentBuf() { - byte[] buf = current.get(); + byte[] buf = (byte[]) current.get(); if (buf == null) { buf = new byte[fixedLen]; current.set(buf); http://git-wip-us.apache.org/repos/asf/kylin/blob/78e6cd5b/core-metadata/src/main/java/org/apache/kylin/dimension/OneMoreByteVLongDimEnc.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/dimension/OneMoreByteVLongDimEnc.java b/core-metadata/src/main/java/org/apache/kylin/dimension/OneMoreByteVLongDimEnc.java index 993aac3..c0c52d1 100644 --- a/core-metadata/src/main/java/org/apache/kylin/dimension/OneMoreByteVLongDimEnc.java +++ b/core-metadata/src/main/java/org/apache/kylin/dimension/OneMoreByteVLongDimEnc.java @@ -18,17 +18,17 @@ package org.apache.kylin.dimension; +import org.apache.kylin.common.util.BytesUtil; +import org.apache.kylin.metadata.datatype.DataTypeSerializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.io.IOException; import java.io.ObjectInput; import java.io.ObjectOutput; import java.nio.ByteBuffer; import java.util.Arrays; -import org.apache.kylin.common.util.BytesUtil; -import org.apache.kylin.metadata.datatype.DataTypeSerializer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - /** * not being used yet, prepared for future */ @@ -118,11 +118,9 @@ public class OneMoreByteVLongDimEnc extends DimensionEncoding { } public class VLongSerializer extends DataTypeSerializer<Object> { - // be thread-safe and avoid repeated obj creation - private ThreadLocal<byte[]> current = new ThreadLocal<byte[]>(); private byte[] currentBuf() { - byte[] buf = current.get(); + byte[] buf = (byte[]) current.get(); if (buf == null) { buf = new byte[byteLen]; current.set(buf); http://git-wip-us.apache.org/repos/asf/kylin/blob/78e6cd5b/core-metadata/src/main/java/org/apache/kylin/measure/BufferedMeasureCodec.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/BufferedMeasureCodec.java b/core-metadata/src/main/java/org/apache/kylin/measure/BufferedMeasureCodec.java index 8a5481c..44e5708 100644 --- a/core-metadata/src/main/java/org/apache/kylin/measure/BufferedMeasureCodec.java +++ b/core-metadata/src/main/java/org/apache/kylin/measure/BufferedMeasureCodec.java @@ -18,26 +18,26 @@ package org.apache.kylin.measure; +import org.apache.kylin.metadata.datatype.DataType; +import org.apache.kylin.metadata.model.MeasureDesc; + import java.nio.BufferOverflowException; import java.nio.ByteBuffer; import java.util.Collection; -import org.apache.kylin.metadata.datatype.DataType; -import org.apache.kylin.metadata.model.MeasureDesc; - /** * This class embeds a reusable byte buffer for measure encoding, and is not thread-safe. * The buffer will grow to accommodate BufferOverflowException until a limit. * The problem here to solve is some measure type cannot provide accurate DataTypeSerializer.maxLength() */ @SuppressWarnings({ "unchecked" }) -public class BufferedMeasureCodec { +public class BufferedMeasureCodec implements java.io.Serializable { public static final int DEFAULT_BUFFER_SIZE = 1024 * 1024; // 1 MB public static final int MAX_BUFFER_SIZE = 1 * 1024 * DEFAULT_BUFFER_SIZE; // 1 GB final private MeasureCodec codec; - private ByteBuffer buf; + private transient ByteBuffer buf; final private int[] measureSizes; public BufferedMeasureCodec(Collection<MeasureDesc> measureDescs) { http://git-wip-us.apache.org/repos/asf/kylin/blob/78e6cd5b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureCodec.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureCodec.java b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureCodec.java index edaf806..2d73e59 100644 --- a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureCodec.java +++ b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureCodec.java @@ -18,19 +18,19 @@ package org.apache.kylin.measure; -import java.nio.ByteBuffer; -import java.util.Collection; - import org.apache.kylin.metadata.datatype.DataType; import org.apache.kylin.metadata.datatype.DataTypeSerializer; import org.apache.kylin.metadata.model.MeasureDesc; +import java.nio.ByteBuffer; +import java.util.Collection; + /** * @author yangli9 * */ @SuppressWarnings({ "rawtypes" }) -public class MeasureCodec { +public class MeasureCodec implements java.io.Serializable { private int nMeasures; private DataTypeSerializer[] serializers; http://git-wip-us.apache.org/repos/asf/kylin/blob/78e6cd5b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureIngester.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureIngester.java b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureIngester.java index 0076252..26b7298 100644 --- a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureIngester.java +++ b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureIngester.java @@ -18,14 +18,14 @@ package org.apache.kylin.measure; -import java.util.Collection; -import java.util.Map; - import org.apache.kylin.common.util.Dictionary; import org.apache.kylin.metadata.model.MeasureDesc; import org.apache.kylin.metadata.model.TblColRef; -abstract public class MeasureIngester<V> { +import java.util.Collection; +import java.util.Map; + +abstract public class MeasureIngester<V> implements java.io.Serializable { public static MeasureIngester<?> create(MeasureDesc measure) { return measure.getFunction().getMeasureType().newIngester(); @@ -42,6 +42,10 @@ abstract public class MeasureIngester<V> { abstract public V valueOf(String[] values, MeasureDesc measureDesc, Map<TblColRef, Dictionary<String>> dictionaryMap); + public void reset() { + + } + public V reEncodeDictionary(V value, MeasureDesc measureDesc, Map<TblColRef, Dictionary<String>> oldDicts, Map<TblColRef, Dictionary<String>> newDicts) { throw new UnsupportedOperationException(); } http://git-wip-us.apache.org/repos/asf/kylin/blob/78e6cd5b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureType.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureType.java b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureType.java index 89ff382..3338c8c 100644 --- a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureType.java +++ b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureType.java @@ -18,11 +18,6 @@ package org.apache.kylin.measure; -import java.util.Collection; -import java.util.Collections; -import java.util.List; -import java.util.Map; - import org.apache.kylin.common.util.Dictionary; import org.apache.kylin.metadata.model.FunctionDesc; import org.apache.kylin.metadata.model.MeasureDesc; @@ -32,13 +27,18 @@ import org.apache.kylin.metadata.realization.SQLDigest; import org.apache.kylin.metadata.tuple.Tuple; import org.apache.kylin.metadata.tuple.TupleInfo; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; + /** * MeasureType captures how a kind of aggregation is defined, how it is calculated * during cube build, and how it is involved in query and storage scan. * * @param <T> the Java type of aggregation data object, e.g. HLLCounter */ -abstract public class MeasureType<T> { +abstract public class MeasureType<T> implements java.io.Serializable { /* ============================================================================ * Define http://git-wip-us.apache.org/repos/asf/kylin/blob/78e6cd5b/core-metadata/src/main/java/org/apache/kylin/measure/basic/DoubleIngester.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/basic/DoubleIngester.java b/core-metadata/src/main/java/org/apache/kylin/measure/basic/DoubleIngester.java index a2f3980..e42f275 100644 --- a/core-metadata/src/main/java/org/apache/kylin/measure/basic/DoubleIngester.java +++ b/core-metadata/src/main/java/org/apache/kylin/measure/basic/DoubleIngester.java @@ -43,4 +43,9 @@ public class DoubleIngester extends MeasureIngester<DoubleMutable> { l.set(Double.parseDouble(values[0])); return l; } + + @Override + public void reset() { + current = new DoubleMutable(); + } } http://git-wip-us.apache.org/repos/asf/kylin/blob/78e6cd5b/core-metadata/src/main/java/org/apache/kylin/measure/basic/DoubleSumAggregator.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/basic/DoubleSumAggregator.java b/core-metadata/src/main/java/org/apache/kylin/measure/basic/DoubleSumAggregator.java index f276817..29eb787 100644 --- a/core-metadata/src/main/java/org/apache/kylin/measure/basic/DoubleSumAggregator.java +++ b/core-metadata/src/main/java/org/apache/kylin/measure/basic/DoubleSumAggregator.java @@ -30,7 +30,8 @@ public class DoubleSumAggregator extends MeasureAggregator<DoubleMutable> { @Override public void reset() { - sum.set(0.0); +// sum.set(0.0); + sum = new DoubleMutable(); } @Override http://git-wip-us.apache.org/repos/asf/kylin/blob/78e6cd5b/core-metadata/src/main/java/org/apache/kylin/measure/basic/LongIngester.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/basic/LongIngester.java b/core-metadata/src/main/java/org/apache/kylin/measure/basic/LongIngester.java index 45a1634..439f096 100644 --- a/core-metadata/src/main/java/org/apache/kylin/measure/basic/LongIngester.java +++ b/core-metadata/src/main/java/org/apache/kylin/measure/basic/LongIngester.java @@ -43,4 +43,9 @@ public class LongIngester extends MeasureIngester<LongMutable> { l.set(Long.parseLong(values[0])); return l; } + + @Override + public void reset() { + current = new LongMutable(); + } } http://git-wip-us.apache.org/repos/asf/kylin/blob/78e6cd5b/core-metadata/src/main/java/org/apache/kylin/measure/basic/LongSumAggregator.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/basic/LongSumAggregator.java b/core-metadata/src/main/java/org/apache/kylin/measure/basic/LongSumAggregator.java index e7fdc9d..0a56af8 100644 --- a/core-metadata/src/main/java/org/apache/kylin/measure/basic/LongSumAggregator.java +++ b/core-metadata/src/main/java/org/apache/kylin/measure/basic/LongSumAggregator.java @@ -30,7 +30,8 @@ public class LongSumAggregator extends MeasureAggregator<LongMutable> { @Override public void reset() { - sum.set(0); +// sum.set(0); + sum = new LongMutable(); } @Override http://git-wip-us.apache.org/repos/asf/kylin/blob/78e6cd5b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapCounter.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapCounter.java b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapCounter.java index 827390d..caab094 100644 --- a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapCounter.java +++ b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapCounter.java @@ -18,6 +18,9 @@ package org.apache.kylin.measure.bitmap; +import org.apache.commons.io.IOUtils; +import org.roaringbitmap.buffer.MutableRoaringBitmap; + import java.io.ByteArrayOutputStream; import java.io.DataInputStream; import java.io.DataOutputStream; @@ -26,13 +29,10 @@ import java.io.InputStream; import java.nio.ByteBuffer; import java.util.Iterator; -import org.apache.commons.io.IOUtils; -import org.roaringbitmap.buffer.MutableRoaringBitmap; - /** * Created by sunyerui on 15/12/1. */ -public class BitmapCounter implements Comparable<BitmapCounter> { +public class BitmapCounter implements Comparable<BitmapCounter>, java.io.Serializable { private MutableRoaringBitmap bitmap = new MutableRoaringBitmap(); http://git-wip-us.apache.org/repos/asf/kylin/blob/78e6cd5b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapMeasureType.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapMeasureType.java b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapMeasureType.java index 8e2b2f7..6ad82a1 100644 --- a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapMeasureType.java +++ b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapMeasureType.java @@ -138,6 +138,11 @@ public class BitmapMeasureType extends MeasureType<BitmapCounter> { } return retValue; } + + @Override + public void reset() { + current = new BitmapCounter(); + } }; } http://git-wip-us.apache.org/repos/asf/kylin/blob/78e6cd5b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCMeasureType.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCMeasureType.java b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCMeasureType.java index 9601653..b2d2ddc 100644 --- a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCMeasureType.java +++ b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCMeasureType.java @@ -105,6 +105,11 @@ public class HLLCMeasureType extends MeasureType<HLLCounter> { } return hllc; } + + @Override + public void reset() { + current = new HyperLogLogPlusCounter(dataType.getPrecision()); + } }; } http://git-wip-us.apache.org/repos/asf/kylin/blob/78e6cd5b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCSerializer.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCSerializer.java b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCSerializer.java index e0992c7..df0cfaf 100644 --- a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCSerializer.java +++ b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCSerializer.java @@ -18,21 +18,18 @@ package org.apache.kylin.measure.hllc; -import java.io.IOException; -import java.nio.ByteBuffer; - import org.apache.kylin.metadata.datatype.DataType; import org.apache.kylin.metadata.datatype.DataTypeSerializer; +import java.io.IOException; +import java.nio.ByteBuffer; + /** * @author yangli9 * */ public class HLLCSerializer extends DataTypeSerializer<HLLCounter> { - // be thread-safe and avoid repeated obj creation - private ThreadLocal<HLLCounter> current = new ThreadLocal<HLLCounter>(); - private int precision; public HLLCSerializer(DataType type) { @@ -49,7 +46,7 @@ public class HLLCSerializer extends DataTypeSerializer<HLLCounter> { } private HLLCounter current() { - HLLCounter hllc = current.get(); + HLLCounter hllc = (HLLCounter) current.get(); if (hllc == null) { hllc = new HLLCounter(precision); current.set(hllc); http://git-wip-us.apache.org/repos/asf/kylin/blob/78e6cd5b/core-metadata/src/main/java/org/apache/kylin/measure/raw/RawSerializer.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/raw/RawSerializer.java b/core-metadata/src/main/java/org/apache/kylin/measure/raw/RawSerializer.java index 021c146..68a0273 100644 --- a/core-metadata/src/main/java/org/apache/kylin/measure/raw/RawSerializer.java +++ b/core-metadata/src/main/java/org/apache/kylin/measure/raw/RawSerializer.java @@ -34,13 +34,11 @@ public class RawSerializer extends DataTypeSerializer<List<ByteArray>> { //FIXME to config this and RowConstants.ROWVALUE_BUFFER_SIZE in properties file public static final int RAW_BUFFER_SIZE = 1024 * 1024;//1M - private ThreadLocal<List<ByteArray>> current = new ThreadLocal<>(); - public RawSerializer(DataType dataType) { } private List<ByteArray> current() { - List<ByteArray> l = current.get(); + List<ByteArray> l = (List<ByteArray>) current.get(); if (l == null) { l = new ArrayList<ByteArray>(); current.set(l); http://git-wip-us.apache.org/repos/asf/kylin/blob/78e6cd5b/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNCounter.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNCounter.java b/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNCounter.java index caf7961..5e4b91e 100644 --- a/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNCounter.java +++ b/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNCounter.java @@ -40,7 +40,7 @@ import com.google.common.collect.Maps; * * @param <T> type of data in the stream to be summarized */ -public class TopNCounter<T> implements Iterable<Counter<T>> { +public class TopNCounter<T> implements Iterable<Counter<T>>, java.io.Serializable { public static final int EXTRA_SPACE_RATE = 50; http://git-wip-us.apache.org/repos/asf/kylin/blob/78e6cd5b/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNMeasureType.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNMeasureType.java b/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNMeasureType.java index c29af6c..8c8b5a6 100644 --- a/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNMeasureType.java +++ b/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNMeasureType.java @@ -24,6 +24,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import com.google.common.collect.Lists; import org.apache.commons.lang3.StringUtils; import org.apache.kylin.common.util.ByteArray; import org.apache.kylin.common.util.Dictionary; @@ -46,8 +47,6 @@ import org.apache.kylin.metadata.tuple.TupleInfo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.collect.Lists; - public class TopNMeasureType extends MeasureType<TopNCounter<ByteArray>> { private static final Logger logger = LoggerFactory.getLogger(TopNMeasureType.class); @@ -156,6 +155,7 @@ public class TopNMeasureType extends MeasureType<TopNCounter<ByteArray>> { return topNCounter; } + @Override public TopNCounter<ByteArray> reEncodeDictionary(TopNCounter<ByteArray> value, MeasureDesc measureDesc, Map<TblColRef, Dictionary<String>> oldDicts, Map<TblColRef, Dictionary<String>> newDicts) { TopNCounter<ByteArray> topNCounter = value; http://git-wip-us.apache.org/repos/asf/kylin/blob/78e6cd5b/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/BigDecimalSerializer.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/BigDecimalSerializer.java b/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/BigDecimalSerializer.java index 64968b8..b5043f5 100644 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/BigDecimalSerializer.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/BigDecimalSerializer.java @@ -35,7 +35,7 @@ public class BigDecimalSerializer extends DataTypeSerializer<BigDecimal> { private static final Logger logger = LoggerFactory.getLogger(BigDecimalSerializer.class); final DataType type; - transient final int maxLength; + final int maxLength; transient int avoidVerbose = 0; http://git-wip-us.apache.org/repos/asf/kylin/blob/78e6cd5b/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/BooleanSerializer.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/BooleanSerializer.java b/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/BooleanSerializer.java index acb6de1..998e5e9 100644 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/BooleanSerializer.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/BooleanSerializer.java @@ -18,18 +18,15 @@ package org.apache.kylin.metadata.datatype; -import java.nio.ByteBuffer; - import org.apache.commons.lang.ArrayUtils; import org.apache.commons.lang.BooleanUtils; +import java.nio.ByteBuffer; + public class BooleanSerializer extends DataTypeSerializer<LongMutable> { public final static String[] TRUE_VALUE_SET = { "true", "t", "on", "yes" }; - // be thread-safe and avoid repeated obj creation - private ThreadLocal<LongMutable> current = new ThreadLocal<LongMutable>(); - public BooleanSerializer(DataType type) { } @@ -39,7 +36,7 @@ public class BooleanSerializer extends DataTypeSerializer<LongMutable> { } private LongMutable current() { - LongMutable l = current.get(); + LongMutable l = (LongMutable) current.get(); if (l == null) { l = new LongMutable(); current.set(l); http://git-wip-us.apache.org/repos/asf/kylin/blob/78e6cd5b/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DataTypeSerializer.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DataTypeSerializer.java b/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DataTypeSerializer.java index a739377..a4a35a4 100644 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DataTypeSerializer.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DataTypeSerializer.java @@ -18,19 +18,21 @@ package org.apache.kylin.metadata.datatype; -import java.nio.ByteBuffer; -import java.util.Map; - +import com.google.common.collect.Maps; import org.apache.kylin.common.util.BytesSerializer; -import com.google.common.collect.Maps; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.nio.ByteBuffer; +import java.util.Map; /** * Note: the implementations MUST be thread-safe. */ -abstract public class DataTypeSerializer<T> implements BytesSerializer<T> { +abstract public class DataTypeSerializer<T> implements BytesSerializer<T>, java.io.Serializable { final static Map<String, Class<?>> implementations = Maps.newHashMap(); + protected transient ThreadLocal current = new ThreadLocal(); static { implementations.put("char", StringSerializer.class); implementations.put("varchar", StringSerializer.class); @@ -94,4 +96,9 @@ abstract public class DataTypeSerializer<T> implements BytesSerializer<T> { else return value.toString(); } + + private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { + in.defaultReadObject(); + current = new ThreadLocal(); + } } http://git-wip-us.apache.org/repos/asf/kylin/blob/78e6cd5b/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DateTimeSerializer.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DateTimeSerializer.java b/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DateTimeSerializer.java index 07f98b3..a5719bd 100644 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DateTimeSerializer.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DateTimeSerializer.java @@ -18,14 +18,11 @@ package org.apache.kylin.metadata.datatype; -import java.nio.ByteBuffer; - import org.apache.kylin.common.util.DateFormat; -public class DateTimeSerializer extends DataTypeSerializer<LongMutable> { +import java.nio.ByteBuffer; - // be thread-safe and avoid repeated obj creation - private ThreadLocal<LongMutable> current = new ThreadLocal<LongMutable>(); +public class DateTimeSerializer extends DataTypeSerializer<LongMutable> { public DateTimeSerializer(DataType type) { } @@ -36,7 +33,7 @@ public class DateTimeSerializer extends DataTypeSerializer<LongMutable> { } private LongMutable current() { - LongMutable l = current.get(); + LongMutable l = (LongMutable) current.get(); if (l == null) { l = new LongMutable(); current.set(l); http://git-wip-us.apache.org/repos/asf/kylin/blob/78e6cd5b/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DoubleSerializer.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DoubleSerializer.java b/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DoubleSerializer.java index 976dc51..cda4ff5 100644 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DoubleSerializer.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DoubleSerializer.java @@ -24,9 +24,6 @@ import java.nio.ByteBuffer; */ public class DoubleSerializer extends DataTypeSerializer<DoubleMutable> { - // be thread-safe and avoid repeated obj creation - private ThreadLocal<DoubleMutable> current = new ThreadLocal<DoubleMutable>(); - public DoubleSerializer(DataType type) { } @@ -36,7 +33,7 @@ public class DoubleSerializer extends DataTypeSerializer<DoubleMutable> { } private DoubleMutable current() { - DoubleMutable d = current.get(); + DoubleMutable d = (DoubleMutable) current.get(); if (d == null) { d = new DoubleMutable(); current.set(d); http://git-wip-us.apache.org/repos/asf/kylin/blob/78e6cd5b/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/Int4Serializer.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/Int4Serializer.java b/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/Int4Serializer.java index 7b95505..c726ab6 100644 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/Int4Serializer.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/Int4Serializer.java @@ -18,17 +18,14 @@ package org.apache.kylin.metadata.datatype; -import java.nio.ByteBuffer; - import org.apache.kylin.common.util.BytesUtil; +import java.nio.ByteBuffer; + /** */ public class Int4Serializer extends DataTypeSerializer<IntMutable> { - // be thread-safe and avoid repeated obj creation - private ThreadLocal<IntMutable> current = new ThreadLocal<IntMutable>(); - public Int4Serializer(DataType type) { } @@ -38,7 +35,7 @@ public class Int4Serializer extends DataTypeSerializer<IntMutable> { } private IntMutable current() { - IntMutable l = current.get(); + IntMutable l = (IntMutable) current.get(); if (l == null) { l = new IntMutable(); current.set(l); http://git-wip-us.apache.org/repos/asf/kylin/blob/78e6cd5b/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/Long8Serializer.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/Long8Serializer.java b/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/Long8Serializer.java index fa333b2..186cdb9 100644 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/Long8Serializer.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/Long8Serializer.java @@ -18,17 +18,14 @@ package org.apache.kylin.metadata.datatype; -import java.nio.ByteBuffer; - import org.apache.kylin.common.util.BytesUtil; +import java.nio.ByteBuffer; + /** */ public class Long8Serializer extends DataTypeSerializer<LongMutable> { - // be thread-safe and avoid repeated obj creation - private ThreadLocal<LongMutable> current = new ThreadLocal<LongMutable>(); - public Long8Serializer(DataType type) { } @@ -38,7 +35,7 @@ public class Long8Serializer extends DataTypeSerializer<LongMutable> { } private LongMutable current() { - LongMutable l = current.get(); + LongMutable l = (LongMutable) current.get(); if (l == null) { l = new LongMutable(); current.set(l); http://git-wip-us.apache.org/repos/asf/kylin/blob/78e6cd5b/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/LongSerializer.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/LongSerializer.java b/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/LongSerializer.java index 9306a70..d8f3f37 100644 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/LongSerializer.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/LongSerializer.java @@ -18,17 +18,14 @@ package org.apache.kylin.metadata.datatype; -import java.nio.ByteBuffer; - import org.apache.kylin.common.util.BytesUtil; +import java.nio.ByteBuffer; + /** */ public class LongSerializer extends DataTypeSerializer<LongMutable> { - // be thread-safe and avoid repeated obj creation - private ThreadLocal<LongMutable> current = new ThreadLocal<LongMutable>(); - public LongSerializer(DataType type) { } @@ -38,7 +35,7 @@ public class LongSerializer extends DataTypeSerializer<LongMutable> { } private LongMutable current() { - LongMutable l = current.get(); + LongMutable l = (LongMutable) current.get(); if (l == null) { l = new LongMutable(); current.set(l);