KYLIN-2135 update UpdateCubeInfoAfterBuildStep for the new folder structure
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/4d9a9231 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/4d9a9231 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/4d9a9231 Branch: refs/heads/KYLIN-2006 Commit: 4d9a92319ae6f0f778328f06d153cc6a7c9c93a8 Parents: dd496a6 Author: shaofengshi <shaofeng...@apache.org> Authored: Tue Nov 8 13:54:35 2016 +0800 Committer: shaofengshi <shaofeng...@apache.org> Committed: Tue Nov 8 21:29:22 2016 +0800 ---------------------------------------------------------------------- .../mr/steps/UpdateCubeInfoAfterBuildStep.java | 35 +++++++------------- 1 file changed, 12 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/4d9a9231/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java index f7af42e..d285799 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java @@ -18,22 +18,17 @@ package org.apache.kylin.engine.mr.steps; -import java.io.BufferedReader; import java.io.IOException; -import java.io.InputStreamReader; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.time.FastDateFormat; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.kylin.common.util.DateFormat; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.engine.mr.CubingJob; -import org.apache.kylin.engine.mr.HadoopUtil; +import org.apache.kylin.engine.mr.DFSFileTable; import org.apache.kylin.engine.mr.common.BatchConstants; import org.apache.kylin.job.exception.ExecuteException; import org.apache.kylin.job.execution.AbstractExecutable; @@ -41,6 +36,7 @@ import org.apache.kylin.job.execution.ExecutableContext; import org.apache.kylin.job.execution.ExecuteResult; import org.apache.kylin.metadata.datatype.DataType; import org.apache.kylin.metadata.model.TblColRef; +import org.apache.kylin.source.ReadableTable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -86,26 +82,19 @@ public class UpdateCubeInfoAfterBuildStep extends AbstractExecutable { private void updateTimeRange(CubeSegment segment) throws IOException { final TblColRef partitionCol = segment.getCubeDesc().getModel().getPartitionDesc().getPartitionDateColumnRef(); - final String outputPath = this.getParams().get(BatchConstants.CFG_OUTPUT_PATH); - final Path outputFile = new Path(outputPath, partitionCol.getName()); - - String minValue = null, maxValue = null, currentValue = null; - FSDataInputStream inputStream = null; - BufferedReader bufferedReader = null; + final String factDistinctPath = this.getParams().get(BatchConstants.CFG_OUTPUT_PATH); + final ReadableTable readableTable = new DFSFileTable(factDistinctPath + "/" + partitionCol.getName(), -1); + final ReadableTable.TableReader tableReader = readableTable.getReader(); + String minValue = null, maxValue = null; try { - FileSystem fs = HadoopUtil.getFileSystem(outputPath); - inputStream = fs.open(outputFile); - bufferedReader = new BufferedReader(new InputStreamReader(inputStream)); - minValue = currentValue = bufferedReader.readLine(); - while (currentValue != null) { - maxValue = currentValue; - currentValue = bufferedReader.readLine(); + while (tableReader.next()) { + if (minValue == null) { + minValue = tableReader.getRow()[0]; + } + maxValue = tableReader.getRow()[0]; } - } catch (IOException e) { - throw e; } finally { - IOUtils.closeQuietly(bufferedReader); - IOUtils.closeQuietly(inputStream); + IOUtils.closeQuietly(tableReader); } final DataType partitionColType = partitionCol.getType();