KYLIN-2766 use Kylin working dir to get FileSystem
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/4316cfdb Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/4316cfdb Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/4316cfdb Branch: refs/heads/2622-2764 Commit: 4316cfdbe21cbab5764c2bcb6a45010bef434bf3 Parents: 891caab Author: shaofengshi <shaofeng...@apache.org> Authored: Sun Aug 20 21:25:17 2017 +0800 Committer: shaofengshi <shaofeng...@apache.org> Committed: Tue Aug 22 20:52:45 2017 +0800 ---------------------------------------------------------------------- .../org/apache/kylin/engine/mr/common/HadoopCmdOutput.java | 4 ++-- .../hive/cardinality/HiveColumnCardinalityUpdateJob.java | 3 ++- .../java/org/apache/kylin/storage/hbase/HBaseConnection.java | 2 +- .../org/apache/kylin/storage/hbase/HBaseResourceStore.java | 7 ++++--- .../org/apache/kylin/storage/hbase/steps/CreateHTableJob.java | 3 ++- .../storage/hbase/steps/HDFSPathGarbageCollectionStep.java | 2 +- .../apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java | 5 +++-- 7 files changed, 15 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/4316cfdb/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopCmdOutput.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopCmdOutput.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopCmdOutput.java index 11d6d2c..0bff511 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopCmdOutput.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopCmdOutput.java @@ -22,11 +22,11 @@ import java.util.Collections; import java.util.HashMap; import java.util.Map; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.mapreduce.Counters; import org.apache.hadoop.mapreduce.FileSystemCounter; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.TaskCounter; +import org.apache.kylin.common.util.HadoopUtil; import org.apache.kylin.engine.mr.steps.FactDistinctColumnsMapper.RawDataCounter; import org.apache.kylin.job.constant.ExecutableConstants; import org.slf4j.Logger; @@ -99,7 +99,7 @@ public class HadoopCmdOutput { mapInputRecords = String.valueOf(counters.findCounter(TaskCounter.MAP_INPUT_RECORDS).getValue()); rawInputBytesRead = String.valueOf(counters.findCounter(RawDataCounter.BYTES).getValue()); - String fsScheme = FileSystem.get(job.getConfiguration()).getScheme(); + String fsScheme = HadoopUtil.getWorkingFileSystem().getScheme(); long bytesWritten = counters.findCounter(fsScheme, FileSystemCounter.BYTES_WRITTEN).getValue(); if (bytesWritten == 0) { bytesWritten = counters.findCounter("FileSystemCounters", "HDFS_BYTES_WRITTEN").getValue(); http://git-wip-us.apache.org/repos/asf/kylin/blob/4316cfdb/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/HiveColumnCardinalityUpdateJob.java ---------------------------------------------------------------------- diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/HiveColumnCardinalityUpdateJob.java b/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/HiveColumnCardinalityUpdateJob.java index 246822c..24abe77 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/HiveColumnCardinalityUpdateJob.java +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/HiveColumnCardinalityUpdateJob.java @@ -37,6 +37,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.CompressionCodecFactory; import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.HadoopUtil; import org.apache.kylin.engine.mr.common.AbstractHadoopJob; import org.apache.kylin.metadata.MetadataManager; import org.apache.kylin.metadata.model.TableExtDesc; @@ -125,7 +126,7 @@ public class HiveColumnCardinalityUpdateJob extends AbstractHadoopJob { } private static List<String> readLines(Path location, Configuration conf) throws Exception { - FileSystem fileSystem = FileSystem.get(location.toUri(), conf); + FileSystem fileSystem = HadoopUtil.getWorkingFileSystem(); CompressionCodecFactory factory = new CompressionCodecFactory(conf); FileStatus[] items = fileSystem.listStatus(location); if (items == null) http://git-wip-us.apache.org/repos/asf/kylin/blob/4316cfdb/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseConnection.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseConnection.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseConnection.java index 6580107..49fc6fa 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseConnection.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseConnection.java @@ -207,7 +207,7 @@ public class HBaseConnection { path = Path.getPathWithoutSchemeAndAuthority(path); try { - FileSystem fs = FileSystem.get(getCurrentHBaseConfiguration()); + FileSystem fs = HadoopUtil.getWorkingFileSystem(getCurrentHBaseConfiguration()); return fs.makeQualified(path).toString(); } catch (IOException e) { throw new IllegalArgumentException("Cannot create FileSystem from current hbase cluster conf", e); http://git-wip-us.apache.org/repos/asf/kylin/blob/4316cfdb/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java index ccbcde8..710440e 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java @@ -56,6 +56,7 @@ import org.apache.kylin.common.persistence.ResourceStore; import org.apache.kylin.common.persistence.StringEntity; import org.apache.kylin.common.util.Bytes; import org.apache.kylin.common.util.BytesUtil; +import org.apache.kylin.common.util.HadoopUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -247,7 +248,7 @@ public class HBaseResourceStore extends ResourceStore { byte[] value = r.getValue(B_FAMILY, B_COLUMN); if (value.length == 0) { Path redirectPath = bigCellHDFSPath(resPath); - FileSystem fileSystem = FileSystem.get(HBaseConnection.getCurrentHBaseConfiguration()); + FileSystem fileSystem = HadoopUtil.getWorkingFileSystem(HBaseConnection.getCurrentHBaseConfiguration()); try { return fileSystem.open(redirectPath); @@ -340,7 +341,7 @@ public class HBaseResourceStore extends ResourceStore { if (hdfsResourceExist) { // remove hdfs cell value Path redirectPath = bigCellHDFSPath(resPath); - FileSystem fileSystem = FileSystem.get(HBaseConnection.getCurrentHBaseConfiguration()); + FileSystem fileSystem = HadoopUtil.getWorkingFileSystem(HBaseConnection.getCurrentHBaseConfiguration()); if (fileSystem.exists(redirectPath)) { fileSystem.delete(redirectPath, true); @@ -388,7 +389,7 @@ public class HBaseResourceStore extends ResourceStore { private Path writeLargeCellToHdfs(String resPath, byte[] largeColumn, Table table) throws IOException { Path redirectPath = bigCellHDFSPath(resPath); - FileSystem fileSystem = FileSystem.get(HBaseConnection.getCurrentHBaseConfiguration()); + FileSystem fileSystem = HadoopUtil.getWorkingFileSystem(HBaseConnection.getCurrentHBaseConfiguration()); if (fileSystem.exists(redirectPath)) { fileSystem.delete(redirectPath, true); http://git-wip-us.apache.org/repos/asf/kylin/blob/4316cfdb/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java index fc52701..e1f8bb1 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java @@ -41,6 +41,7 @@ import org.apache.hadoop.util.ToolRunner; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.Bytes; import org.apache.kylin.common.util.BytesUtil; +import org.apache.kylin.common.util.HadoopUtil; import org.apache.kylin.common.util.ShardingHash; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; @@ -274,7 +275,7 @@ public class CreateHTableJob extends AbstractHadoopJob { // note read-write separation, respect HBase FS here Configuration hbaseConf = HBaseConnection.getCurrentHBaseConfiguration(); - FileSystem fs = FileSystem.get(hbaseConf); + FileSystem fs = HadoopUtil.getWorkingFileSystem(hbaseConf); if (fs.exists(outputFolder) == false) { fs.mkdirs(outputFolder); } http://git-wip-us.apache.org/repos/asf/kylin/blob/4316cfdb/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HDFSPathGarbageCollectionStep.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HDFSPathGarbageCollectionStep.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HDFSPathGarbageCollectionStep.java index 86e8e6b..981672c 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HDFSPathGarbageCollectionStep.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HDFSPathGarbageCollectionStep.java @@ -62,7 +62,7 @@ public class HDFSPathGarbageCollectionStep extends AbstractExecutable { dropHdfsPathOnCluster(toDeletePaths, HadoopUtil.getWorkingFileSystem()); if (StringUtils.isNotEmpty(context.getConfig().getHBaseClusterFs())) { - dropHdfsPathOnCluster(toDeletePaths, FileSystem.get(HBaseConnection.getCurrentHBaseConfiguration())); + dropHdfsPathOnCluster(toDeletePaths, HadoopUtil.getWorkingFileSystem(HBaseConnection.getCurrentHBaseConfiguration())); } } catch (IOException e) { logger.error("job:" + getId() + " execute finished with exception", e); http://git-wip-us.apache.org/repos/asf/kylin/blob/4316cfdb/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java index c437e66..110a51a 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java @@ -50,6 +50,7 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.KylinVersion; import org.apache.kylin.common.util.Bytes; +import org.apache.kylin.common.util.HadoopUtil; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.CubeSegment; @@ -86,7 +87,7 @@ public class DeployCoprocessorCLI { KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); Configuration hconf = HBaseConnection.getCurrentHBaseConfiguration(); - FileSystem fileSystem = FileSystem.get(hconf); + FileSystem fileSystem = HadoopUtil.getWorkingFileSystem(hconf); Connection conn = HBaseConnection.get(kylinConfig.getStorageUrl()); Admin hbaseAdmin = conn.getAdmin(); @@ -217,7 +218,7 @@ public class DeployCoprocessorCLI { private static void initHTableCoprocessor(HTableDescriptor desc) throws IOException { KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); Configuration hconf = HBaseConnection.getCurrentHBaseConfiguration(); - FileSystem fileSystem = FileSystem.get(hconf); + FileSystem fileSystem = HadoopUtil.getWorkingFileSystem(hconf); String localCoprocessorJar = kylinConfig.getCoprocessorLocalJar(); Path hdfsCoprocessorJar = DeployCoprocessorCLI.uploadCoprocessorJar(localCoprocessorJar, fileSystem, null);