KYLIN-2351 enforce Path schema
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/ee020cc6 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/ee020cc6 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/ee020cc6 Branch: refs/heads/yang22-hbase102 Commit: ee020cc69bef3a263e348b594ddae77ca8390efe Parents: 4f6515d Author: Li Yang <liy...@apache.org> Authored: Thu Jan 5 11:26:02 2017 +0800 Committer: Li Yang <liy...@apache.org> Committed: Thu Jan 5 13:24:38 2017 +0800 ---------------------------------------------------------------------- .../org/apache/kylin/engine/mr/HadoopUtil.java | 23 +++++++++++++++----- .../engine/mr/steps/MergeStatisticsStep.java | 2 +- .../steps/RowKeyDistributionCheckerMapper.java | 3 ++- .../kylin/engine/mr/SortedColumnReaderTest.java | 20 ++++++++++++----- 4 files changed, 35 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/ee020cc6/engine-mr/src/main/java/org/apache/kylin/engine/mr/HadoopUtil.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/HadoopUtil.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/HadoopUtil.java index 3d29a02..3119c1e 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/HadoopUtil.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/HadoopUtil.java @@ -72,17 +72,30 @@ public class HadoopUtil { } public static FileSystem getWorkingFileSystem(Configuration conf) throws IOException { - return getFileSystem(KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory(), conf); + Path workingPath = new Path(KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory()); + return getFileSystem(workingPath, conf); } public static FileSystem getFileSystem(String path) throws IOException { - return getFileSystem(path, getCurrentConfiguration()); + return getFileSystem(new Path(makeURI(path))); } - - static FileSystem getFileSystem(String path, Configuration conf) throws IOException { - return FileSystem.get(makeURI(path), conf); + + public static FileSystem getFileSystem(Path path) throws IOException { + Configuration conf = getCurrentConfiguration(); + return getFileSystem(path, conf); } + public static FileSystem getFileSystem(Path path, Configuration conf) { + if (StringUtils.isBlank(path.toUri().getScheme())) + throw new IllegalArgumentException("Path must be qualified: " + path); + + try { + return path.getFileSystem(conf); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + public static URI makeURI(String filePath) { try { return new URI(fixWindowsPath(filePath)); http://git-wip-us.apache.org/repos/asf/kylin/blob/ee020cc6/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java index 88f6ba2..af86181 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java @@ -123,7 +123,7 @@ public class MergeStatisticsStep extends AbstractExecutable { averageSamplingPercentage = averageSamplingPercentage / CubingExecutableUtil.getMergingSegmentIds(this.getParams()).size(); CubeStatsWriter.writeCuboidStatistics(conf, new Path(CubingExecutableUtil.getMergedStatisticsPath(this.getParams())), cuboidHLLMap, averageSamplingPercentage); Path statisticsFilePath = new Path(CubingExecutableUtil.getMergedStatisticsPath(this.getParams()), BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION_FILENAME); - FileSystem fs = statisticsFilePath.getFileSystem(conf); + FileSystem fs = HadoopUtil.getFileSystem(statisticsFilePath, conf); FSDataInputStream is = fs.open(statisticsFilePath); try { // put the statistics to metadata store http://git-wip-us.apache.org/repos/asf/kylin/blob/ee020cc6/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/RowKeyDistributionCheckerMapper.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/RowKeyDistributionCheckerMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/RowKeyDistributionCheckerMapper.java index fca91a6..ee8da6b 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/RowKeyDistributionCheckerMapper.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/RowKeyDistributionCheckerMapper.java @@ -33,6 +33,7 @@ import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.util.ReflectionUtils; +import org.apache.kylin.engine.mr.HadoopUtil; import org.apache.kylin.engine.mr.KylinMapper; /** @@ -89,7 +90,7 @@ public class RowKeyDistributionCheckerMapper extends KylinMapper<Text, Text, Tex List<byte[]> rowkeyList = new ArrayList<byte[]>(); SequenceFile.Reader reader = null; try { - reader = new SequenceFile.Reader(path.getFileSystem(conf), path, conf); + reader = new SequenceFile.Reader(HadoopUtil.getFileSystem(path, conf), path, conf); Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf); Writable value = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), conf); while (reader.next(key, value)) { http://git-wip-us.apache.org/repos/asf/kylin/blob/ee020cc6/engine-mr/src/test/java/org/apache/kylin/engine/mr/SortedColumnReaderTest.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/test/java/org/apache/kylin/engine/mr/SortedColumnReaderTest.java b/engine-mr/src/test/java/org/apache/kylin/engine/mr/SortedColumnReaderTest.java index 3c4195f..be440f6 100644 --- a/engine-mr/src/test/java/org/apache/kylin/engine/mr/SortedColumnReaderTest.java +++ b/engine-mr/src/test/java/org/apache/kylin/engine/mr/SortedColumnReaderTest.java @@ -48,10 +48,9 @@ public class SortedColumnReaderTest { @Test public void testReadStringMultiFile() throws Exception { String dirPath = "src/test/resources/multi_file_str"; - StringBytesConverter converter = new StringBytesConverter(); ArrayList<String> correctAnswer = readAllFiles(dirPath); Collections.sort(correctAnswer, new ByteComparator<String>(new StringBytesConverter())); - SortedColumnDFSFile column = new SortedColumnDFSFile(dirPath + "/", DataType.getType("varchar")); + SortedColumnDFSFile column = new SortedColumnDFSFile(qualify(dirPath + "/"), DataType.getType("varchar")); IDictionaryValueEnumerator e = new TableColumnValueEnumerator(column.getReader(), -1); ArrayList<String> output = new ArrayList<>(); while (e.moveNext()) { @@ -124,7 +123,7 @@ public class SortedColumnReaderTest { } } }); - SortedColumnDFSFile column = new SortedColumnDFSFile(dirPath + "/", DataType.getType("long")); + SortedColumnDFSFile column = new SortedColumnDFSFile(qualify(dirPath + "/"), DataType.getType("long")); IDictionaryValueEnumerator e = new TableColumnValueEnumerator(column.getReader(), -1); ArrayList<String> output = new ArrayList<>(); while (e.moveNext()) { @@ -142,7 +141,7 @@ public class SortedColumnReaderTest { public void testEmptyDir() throws Exception { String dirPath = "src/test/resources/empty_dir"; new File(dirPath).mkdirs(); - SortedColumnDFSFile column = new SortedColumnDFSFile(dirPath + "/", DataType.getType("varchar")); + SortedColumnDFSFile column = new SortedColumnDFSFile(qualify(dirPath + "/"), DataType.getType("varchar")); IDictionaryValueEnumerator e = new TableColumnValueEnumerator(column.getReader(), -1); ArrayList<String> output = new ArrayList<>(); while (e.moveNext()) { @@ -159,7 +158,7 @@ public class SortedColumnReaderTest { final BytesConverter<String> converter = new StringBytesConverter(); Collections.sort(correctAnswer, new ByteComparator<String>(new StringBytesConverter())); System.out.println("correct answer:" + correctAnswer); - SortedColumnDFSFile column = new SortedColumnDFSFile(dirPath + "/", DataType.getType("varchar")); + SortedColumnDFSFile column = new SortedColumnDFSFile(qualify(dirPath + "/"), DataType.getType("varchar")); IDictionaryValueEnumerator e = new TableColumnValueEnumerator(column.getReader(), -1); ArrayList<String> output = new ArrayList<>(); while (e.moveNext()) { @@ -230,7 +229,7 @@ public class SortedColumnReaderTest { } } }); - SortedColumnDFSFile column = new SortedColumnDFSFile(dirPath + "/", DataType.getType("double")); + SortedColumnDFSFile column = new SortedColumnDFSFile(qualify(dirPath + "/"), DataType.getType("double")); IDictionaryValueEnumerator e = new TableColumnValueEnumerator(column.getReader(), -1); ArrayList<String> output = new ArrayList<>(); while (e.moveNext()) { @@ -300,4 +299,13 @@ public class SortedColumnReaderTest { } return result; } + + private String qualify(String path) { + String absolutePath = new File(path).getAbsolutePath(); + if (absolutePath.startsWith("/")) + return "file://" + absolutePath; + else + return "file:///" + absolutePath; + } + }