This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch kylin-on-parquet-v2 in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/kylin-on-parquet-v2 by this push: new 124dd05 KYLIN-4818 Not store HLL binary file into RDBMS directly 124dd05 is described below commit 124dd054923b9e41f37c725494ff1cadac496eee Author: XiaoxiangYu <x...@apache.org> AuthorDate: Thu Jan 7 20:43:55 2021 +0800 KYLIN-4818 Not store HLL binary file into RDBMS directly --- .../apache/kylin/common/persistence/ResourceStore.java | 17 ++++++++++++----- .../main/java/org/apache/kylin/cube/CubeSegment.java | 6 +++--- .../kylin/engine/spark/utils/UpdateMetadataUtil.java | 9 +++------ .../org/apache/kylin/engine/spark/job/CubeBuildJob.java | 9 +++------ .../apache/kylin/storage/hbase/HBaseResourceStore.java | 3 ++- 5 files changed, 23 insertions(+), 21 deletions(-) diff --git a/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java b/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java index 80d66af..65d4f59 100644 --- a/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java +++ b/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java @@ -6,15 +6,15 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. -*/ + */ package org.apache.kylin.common.persistence; @@ -138,7 +138,7 @@ abstract public class ResourceStore { StringEntity.serializer); } StringEntity entity = getResource(ResourceStore.METASTORE_UUID_TAG, StringEntity.serializer); - return entity == null ? "":entity.toString(); + return entity == null ? "" : entity.toString(); } /** @@ -361,6 +361,13 @@ abstract public class ResourceStore { return writer.bytesWritten(); } + final public void putBigResource(String resPath, InputStream content, long ts) throws IOException { + resPath = norm(resPath); + ContentWriter writer = ContentWriter.create(content); + writer.markBigContent(); + putResourceCheckpoint(resPath, writer, ts); + } + /** * Overwrite a resource without write conflict check * @return bytes written @@ -447,7 +454,7 @@ abstract public class ResourceStore { throws IOException, WriteConflictException; private long checkAndPutResourceWithRetry(final String resPath, final byte[] content, final long oldTS, - final long newTS) throws IOException, WriteConflictException { + final long newTS) throws IOException, WriteConflictException { ExponentialBackoffRetry retry = new ExponentialBackoffRetry(this); return retry.doWithRetry(() -> checkAndPutResourceImpl(resPath, content, oldTS, newTS)); } diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java index 706cd97..1cd3aa5 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -541,7 +541,7 @@ public class CubeSegment implements IBuildable, ISegment, Serializable { } public String getPreciseStatisticsResourcePath() { - return getStatisticsResourcePath(this.getCubeInstance().getName(), this.getUuid(), "json"); + return getStatisticsResourcePath(this.getCubeInstance().getName(), this.getUuid(), ""); } public static String getStatisticsResourcePath(String cubeName, String cubeSegmentId) { diff --git a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/utils/UpdateMetadataUtil.java b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/utils/UpdateMetadataUtil.java index 5560a1c..0987842 100644 --- a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/utils/UpdateMetadataUtil.java +++ b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/utils/UpdateMetadataUtil.java @@ -40,7 +40,6 @@ import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.cube.CubeUpdate; import org.apache.kylin.cube.model.CubeBuildTypeEnum; -import org.apache.kylin.engine.mr.JobBuilderSupport; import org.apache.kylin.engine.mr.common.BatchConstants; import org.apache.kylin.engine.mr.steps.CubingExecutableUtil; import org.apache.kylin.engine.spark.job.NSparkExecutable; @@ -50,8 +49,6 @@ import org.apache.kylin.metadata.realization.RealizationStatusEnum; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static org.apache.kylin.engine.mr.common.BatchConstants.CFG_OUTPUT_STATISTICS; - public class UpdateMetadataUtil { protected static final Logger logger = LoggerFactory.getLogger(UpdateMetadataUtil.class); @@ -80,12 +77,12 @@ public class UpdateMetadataUtil { currentInstanceCopy.toString(), toUpdateSeg.toString(), tobeSegments.toString())); String resKey = toUpdateSeg.getStatisticsResourcePath(); - String jobWorkingDirPath = JobBuilderSupport.getJobWorkingDir(currentInstanceCopy.getConfig().getHdfsWorkingDirectory(), nsparkExecutable.getParam(MetadataConstants.P_JOB_ID)); - Path statisticsFile = new Path(jobWorkingDirPath + "/" + segmentId + "/" + CFG_OUTPUT_STATISTICS + "/" + BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION_FILENAME); + String jobTmpDir = config.getJobTmpDir(currentInstanceCopy.getProject()) + "/" + nsparkExecutable.getParam(MetadataConstants.P_JOB_ID); + Path statisticsFile = new Path(jobTmpDir + "/" + ResourceStore.CUBE_STATISTICS_ROOT + "/" + cubeId + "/" + segmentId + "/" + BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION_FILENAME); FileSystem fs = HadoopUtil.getWorkingFileSystem(); if (fs.exists(statisticsFile)) { FSDataInputStream is = fs.open(statisticsFile); - ResourceStore.getStore(config).putResource(resKey, is, System.currentTimeMillis()); + ResourceStore.getStore(config).putBigResource(resKey, is, System.currentTimeMillis()); } CubeUpdate update = new CubeUpdate(currentInstanceCopy); diff --git a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java index 89ecad4..51f9f2c 100644 --- a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java +++ b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java @@ -47,7 +47,6 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.util.StringUtils; -import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.HadoopUtil; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; @@ -85,8 +84,6 @@ import org.apache.kylin.shaded.com.google.common.collect.Sets; import scala.Tuple2; import scala.collection.JavaConversions; -import static org.apache.kylin.engine.mr.common.BatchConstants.CFG_OUTPUT_STATISTICS; - public class CubeBuildJob extends SparkApplication { protected static final Logger logger = LoggerFactory.getLogger(CubeBuildJob.class); protected static String TEMP_DIR_SUFFIX = "_temp"; @@ -119,7 +116,7 @@ public class CubeBuildJob extends SparkApplication { cubeManager = CubeManager.getInstance(config); cubeInstance = cubeManager.getCubeByUuid(cubeName); CubeSegment newSegment = cubeInstance.getSegmentById(firstSegmentId); - SpanningTree spanningTree ; + SpanningTree spanningTree; ParentSourceChooser sourceChooser; // Cuboid Statistics is served for Cube Planner Phase One at the moment @@ -140,8 +137,8 @@ public class CubeBuildJob extends SparkApplication { logger.info("Cuboid statistics return {} records and cost {} ms.", hllMap.size(), (System.currentTimeMillis() - startMills)); // 1.2 Save cuboid statistics - String jobWorkingDirPath = JobBuilderSupport.getJobWorkingDir(cubeInstance.getConfig().getHdfsWorkingDirectory(), jobId); - Path statisticsDir = new Path(jobWorkingDirPath + "/" + firstSegmentId + "/" + CFG_OUTPUT_STATISTICS); + String jobTmpDir = config.getJobTmpDir(project) + "/" + jobId; + Path statisticsDir = new Path(jobTmpDir + "/" + ResourceStore.CUBE_STATISTICS_ROOT + "/" + cubeName + "/" + firstSegmentId + "/"); Optional<HLLCounter> hll = hllMap.values().stream().max(Comparator.comparingLong(HLLCounter::getCountEstimate)); long rc = hll.map(HLLCounter::getCountEstimate).orElse(1L); CubeStatsWriter.writeCuboidStatistics(HadoopUtil.getCurrentConfiguration(), statisticsDir, hllMap, 1, rc); diff --git a/metastore-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java b/metastore-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java index e5a2595..ab077d8 100644 --- a/metastore-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java +++ b/metastore-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java @@ -61,6 +61,7 @@ import org.slf4j.LoggerFactory; import com.google.common.base.Preconditions; +@Deprecated public class HBaseResourceStore extends PushdownResourceStore { private static Logger logger = LoggerFactory.getLogger(HBaseResourceStore.class); @@ -147,7 +148,7 @@ public class HBaseResourceStore extends PushdownResourceStore { @Override protected void visitFolderImpl(String folderPath, final boolean recursive, VisitFilter filter, - final boolean loadContent, final Visitor visitor) throws IOException { + final boolean loadContent, final Visitor visitor) throws IOException { visitFolder(folderPath, filter, loadContent, new FolderVisitor() { @Override