KYLIN-2135 update UpdateCubeInfoAfterBuildStep for the new folder structure


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/4d9a9231
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/4d9a9231
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/4d9a9231

Branch: refs/heads/KYLIN-2006
Commit: 4d9a92319ae6f0f778328f06d153cc6a7c9c93a8
Parents: dd496a6
Author: shaofengshi <shaofeng...@apache.org>
Authored: Tue Nov 8 13:54:35 2016 +0800
Committer: shaofengshi <shaofeng...@apache.org>
Committed: Tue Nov 8 21:29:22 2016 +0800

----------------------------------------------------------------------
 .../mr/steps/UpdateCubeInfoAfterBuildStep.java  | 35 +++++++-------------
 1 file changed, 12 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/4d9a9231/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java
index f7af42e..d285799 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java
@@ -18,22 +18,17 @@
 
 package org.apache.kylin.engine.mr.steps;
 
-import java.io.BufferedReader;
 import java.io.IOException;
-import java.io.InputStreamReader;
 
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.time.FastDateFormat;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 import org.apache.kylin.common.util.DateFormat;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.engine.mr.CubingJob;
-import org.apache.kylin.engine.mr.HadoopUtil;
+import org.apache.kylin.engine.mr.DFSFileTable;
 import org.apache.kylin.engine.mr.common.BatchConstants;
 import org.apache.kylin.job.exception.ExecuteException;
 import org.apache.kylin.job.execution.AbstractExecutable;
@@ -41,6 +36,7 @@ import org.apache.kylin.job.execution.ExecutableContext;
 import org.apache.kylin.job.execution.ExecuteResult;
 import org.apache.kylin.metadata.datatype.DataType;
 import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.source.ReadableTable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -86,26 +82,19 @@ public class UpdateCubeInfoAfterBuildStep extends 
AbstractExecutable {
 
     private void updateTimeRange(CubeSegment segment) throws IOException {
         final TblColRef partitionCol = 
segment.getCubeDesc().getModel().getPartitionDesc().getPartitionDateColumnRef();
-        final String outputPath = 
this.getParams().get(BatchConstants.CFG_OUTPUT_PATH);
-        final Path outputFile = new Path(outputPath, partitionCol.getName());
-
-        String minValue = null, maxValue = null, currentValue = null;
-        FSDataInputStream inputStream = null;
-        BufferedReader bufferedReader = null;
+        final String factDistinctPath = 
this.getParams().get(BatchConstants.CFG_OUTPUT_PATH);
+        final ReadableTable readableTable = new DFSFileTable(factDistinctPath 
+ "/" + partitionCol.getName(), -1);
+        final ReadableTable.TableReader tableReader = 
readableTable.getReader();
+        String minValue = null, maxValue = null;
         try {
-            FileSystem fs = HadoopUtil.getFileSystem(outputPath);
-            inputStream = fs.open(outputFile);
-            bufferedReader = new BufferedReader(new 
InputStreamReader(inputStream));
-            minValue = currentValue = bufferedReader.readLine();
-            while (currentValue != null) {
-                maxValue = currentValue;
-                currentValue = bufferedReader.readLine();
+            while (tableReader.next()) {
+                if (minValue == null) {
+                    minValue = tableReader.getRow()[0];
+                }
+                maxValue = tableReader.getRow()[0];
             }
-        } catch (IOException e) {
-            throw e;
         } finally {
-            IOUtils.closeQuietly(bufferedReader);
-            IOUtils.closeQuietly(inputStream);
+            IOUtils.closeQuietly(tableReader);
         }
 
         final DataType partitionColType = partitionCol.getType();

Reply via email to