KYLIN-2766 use Kylin working dir to get FileSystem

Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/4316cfdb
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/4316cfdb
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/4316cfdb

Branch: refs/heads/2622-2764
Commit: 4316cfdbe21cbab5764c2bcb6a45010bef434bf3
Parents: 891caab
Author: shaofengshi <shaofeng...@apache.org>
Authored: Sun Aug 20 21:25:17 2017 +0800
Committer: shaofengshi <shaofeng...@apache.org>
Committed: Tue Aug 22 20:52:45 2017 +0800

----------------------------------------------------------------------
 .../org/apache/kylin/engine/mr/common/HadoopCmdOutput.java    | 4 ++--
 .../hive/cardinality/HiveColumnCardinalityUpdateJob.java      | 3 ++-
 .../java/org/apache/kylin/storage/hbase/HBaseConnection.java  | 2 +-
 .../org/apache/kylin/storage/hbase/HBaseResourceStore.java    | 7 ++++---
 .../org/apache/kylin/storage/hbase/steps/CreateHTableJob.java | 3 ++-
 .../storage/hbase/steps/HDFSPathGarbageCollectionStep.java    | 2 +-
 .../apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java | 5 +++--
 7 files changed, 15 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/4316cfdb/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopCmdOutput.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopCmdOutput.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopCmdOutput.java
index 11d6d2c..0bff511 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopCmdOutput.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopCmdOutput.java
@@ -22,11 +22,11 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.FileSystemCounter;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.TaskCounter;
+import org.apache.kylin.common.util.HadoopUtil;
 import 
org.apache.kylin.engine.mr.steps.FactDistinctColumnsMapper.RawDataCounter;
 import org.apache.kylin.job.constant.ExecutableConstants;
 import org.slf4j.Logger;
@@ -99,7 +99,7 @@ public class HadoopCmdOutput {
             mapInputRecords = 
String.valueOf(counters.findCounter(TaskCounter.MAP_INPUT_RECORDS).getValue());
             rawInputBytesRead = 
String.valueOf(counters.findCounter(RawDataCounter.BYTES).getValue());
 
-            String fsScheme = 
FileSystem.get(job.getConfiguration()).getScheme();
+            String fsScheme = HadoopUtil.getWorkingFileSystem().getScheme();
             long bytesWritten = counters.findCounter(fsScheme, 
FileSystemCounter.BYTES_WRITTEN).getValue();
             if (bytesWritten == 0) {
                 bytesWritten = counters.findCounter("FileSystemCounters", 
"HDFS_BYTES_WRITTEN").getValue();

http://git-wip-us.apache.org/repos/asf/kylin/blob/4316cfdb/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/HiveColumnCardinalityUpdateJob.java
----------------------------------------------------------------------
diff --git 
a/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/HiveColumnCardinalityUpdateJob.java
 
b/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/HiveColumnCardinalityUpdateJob.java
index 246822c..24abe77 100644
--- 
a/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/HiveColumnCardinalityUpdateJob.java
+++ 
b/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/HiveColumnCardinalityUpdateJob.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.CompressionCodecFactory;
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.HadoopUtil;
 import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
 import org.apache.kylin.metadata.MetadataManager;
 import org.apache.kylin.metadata.model.TableExtDesc;
@@ -125,7 +126,7 @@ public class HiveColumnCardinalityUpdateJob extends 
AbstractHadoopJob {
     }
 
     private static List<String> readLines(Path location, Configuration conf) 
throws Exception {
-        FileSystem fileSystem = FileSystem.get(location.toUri(), conf);
+        FileSystem fileSystem = HadoopUtil.getWorkingFileSystem();
         CompressionCodecFactory factory = new CompressionCodecFactory(conf);
         FileStatus[] items = fileSystem.listStatus(location);
         if (items == null)

http://git-wip-us.apache.org/repos/asf/kylin/blob/4316cfdb/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseConnection.java
----------------------------------------------------------------------
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseConnection.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseConnection.java
index 6580107..49fc6fa 100644
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseConnection.java
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseConnection.java
@@ -207,7 +207,7 @@ public class HBaseConnection {
         path = Path.getPathWithoutSchemeAndAuthority(path);
 
         try {
-            FileSystem fs = FileSystem.get(getCurrentHBaseConfiguration());
+            FileSystem fs = 
HadoopUtil.getWorkingFileSystem(getCurrentHBaseConfiguration());
             return fs.makeQualified(path).toString();
         } catch (IOException e) {
             throw new IllegalArgumentException("Cannot create FileSystem from 
current hbase cluster conf", e);

http://git-wip-us.apache.org/repos/asf/kylin/blob/4316cfdb/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
----------------------------------------------------------------------
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
index ccbcde8..710440e 100644
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
@@ -56,6 +56,7 @@ import org.apache.kylin.common.persistence.ResourceStore;
 import org.apache.kylin.common.persistence.StringEntity;
 import org.apache.kylin.common.util.Bytes;
 import org.apache.kylin.common.util.BytesUtil;
+import org.apache.kylin.common.util.HadoopUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -247,7 +248,7 @@ public class HBaseResourceStore extends ResourceStore {
         byte[] value = r.getValue(B_FAMILY, B_COLUMN);
         if (value.length == 0) {
             Path redirectPath = bigCellHDFSPath(resPath);
-            FileSystem fileSystem = 
FileSystem.get(HBaseConnection.getCurrentHBaseConfiguration());
+            FileSystem fileSystem = 
HadoopUtil.getWorkingFileSystem(HBaseConnection.getCurrentHBaseConfiguration());
 
             try {
                 return fileSystem.open(redirectPath);
@@ -340,7 +341,7 @@ public class HBaseResourceStore extends ResourceStore {
 
             if (hdfsResourceExist) { // remove hdfs cell value
                 Path redirectPath = bigCellHDFSPath(resPath);
-                FileSystem fileSystem = 
FileSystem.get(HBaseConnection.getCurrentHBaseConfiguration());
+                FileSystem fileSystem = 
HadoopUtil.getWorkingFileSystem(HBaseConnection.getCurrentHBaseConfiguration());
 
                 if (fileSystem.exists(redirectPath)) {
                     fileSystem.delete(redirectPath, true);
@@ -388,7 +389,7 @@ public class HBaseResourceStore extends ResourceStore {
 
     private Path writeLargeCellToHdfs(String resPath, byte[] largeColumn, 
Table table) throws IOException {
         Path redirectPath = bigCellHDFSPath(resPath);
-        FileSystem fileSystem = 
FileSystem.get(HBaseConnection.getCurrentHBaseConfiguration());
+        FileSystem fileSystem = 
HadoopUtil.getWorkingFileSystem(HBaseConnection.getCurrentHBaseConfiguration());
 
         if (fileSystem.exists(redirectPath)) {
             fileSystem.delete(redirectPath, true);

http://git-wip-us.apache.org/repos/asf/kylin/blob/4316cfdb/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
----------------------------------------------------------------------
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
index fc52701..e1f8bb1 100644
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.util.ToolRunner;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.Bytes;
 import org.apache.kylin.common.util.BytesUtil;
+import org.apache.kylin.common.util.HadoopUtil;
 import org.apache.kylin.common.util.ShardingHash;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
@@ -274,7 +275,7 @@ public class CreateHTableJob extends AbstractHadoopJob {
 
         // note read-write separation, respect HBase FS here
         Configuration hbaseConf = 
HBaseConnection.getCurrentHBaseConfiguration();
-        FileSystem fs = FileSystem.get(hbaseConf);
+        FileSystem fs = HadoopUtil.getWorkingFileSystem(hbaseConf);
         if (fs.exists(outputFolder) == false) {
             fs.mkdirs(outputFolder);
         }

http://git-wip-us.apache.org/repos/asf/kylin/blob/4316cfdb/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HDFSPathGarbageCollectionStep.java
----------------------------------------------------------------------
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HDFSPathGarbageCollectionStep.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HDFSPathGarbageCollectionStep.java
index 86e8e6b..981672c 100644
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HDFSPathGarbageCollectionStep.java
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HDFSPathGarbageCollectionStep.java
@@ -62,7 +62,7 @@ public class HDFSPathGarbageCollectionStep extends 
AbstractExecutable {
             dropHdfsPathOnCluster(toDeletePaths, 
HadoopUtil.getWorkingFileSystem());
 
             if 
(StringUtils.isNotEmpty(context.getConfig().getHBaseClusterFs())) {
-                dropHdfsPathOnCluster(toDeletePaths, 
FileSystem.get(HBaseConnection.getCurrentHBaseConfiguration()));
+                dropHdfsPathOnCluster(toDeletePaths, 
HadoopUtil.getWorkingFileSystem(HBaseConnection.getCurrentHBaseConfiguration()));
             }
         } catch (IOException e) {
             logger.error("job:" + getId() + " execute finished with 
exception", e);

http://git-wip-us.apache.org/repos/asf/kylin/blob/4316cfdb/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java
----------------------------------------------------------------------
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java
index c437e66..110a51a 100644
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java
@@ -50,6 +50,7 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.KylinVersion;
 import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.HadoopUtil;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
@@ -86,7 +87,7 @@ public class DeployCoprocessorCLI {
 
         KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
         Configuration hconf = HBaseConnection.getCurrentHBaseConfiguration();
-        FileSystem fileSystem = FileSystem.get(hconf);
+        FileSystem fileSystem = HadoopUtil.getWorkingFileSystem(hconf);
         Connection conn = HBaseConnection.get(kylinConfig.getStorageUrl());
         Admin hbaseAdmin = conn.getAdmin();
 
@@ -217,7 +218,7 @@ public class DeployCoprocessorCLI {
     private static void initHTableCoprocessor(HTableDescriptor desc) throws 
IOException {
         KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
         Configuration hconf = HBaseConnection.getCurrentHBaseConfiguration();
-        FileSystem fileSystem = FileSystem.get(hconf);
+        FileSystem fileSystem = HadoopUtil.getWorkingFileSystem(hconf);
 
         String localCoprocessorJar = kylinConfig.getCoprocessorLocalJar();
         Path hdfsCoprocessorJar = 
DeployCoprocessorCLI.uploadCoprocessorJar(localCoprocessorJar, fileSystem, 
null);

Reply via email to