KYLIN-2144 move useful operation tools to org.apache.kylin.tool
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/7358a78a Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/7358a78a Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/7358a78a Branch: refs/heads/master Commit: 7358a78a482a9de6a255ad8334fc0eeac9213be3 Parents: 2dffc1d Author: Hongbin Ma <mahong...@apache.org> Authored: Tue Nov 1 17:48:37 2016 +0800 Committer: Hongbin Ma <mahong...@apache.org> Committed: Thu Nov 3 13:34:12 2016 +0800 ---------------------------------------------------------------------- build/bin/metastore.sh | 2 +- build/smoke-test/smoke-test.sh | 2 +- .../engine/mr/steps/MetadataCleanupJob.java | 3 + kylin-it/pom.xml | 4 + .../kylin/provision/BuildCubeWithEngine.java | 2 +- .../kylin/provision/BuildCubeWithStream.java | 2 +- server-base/pom.xml | 4 + .../apache/kylin/rest/service/AdminService.java | 2 +- .../storage/hbase/util/CubeMigrationCLI.java | 2 + .../hbase/util/CubeMigrationCheckCLI.java | 2 + .../hbase/util/ExtendCubeToHybridCLI.java | 4 + .../storage/hbase/util/HiveCmdBuilder.java | 1 + .../storage/hbase/util/StorageCleanupJob.java | 3 + .../org/apache/kylin/tool/CubeMigrationCLI.java | 586 +++++++++++++++++++ .../kylin/tool/CubeMigrationCheckCLI.java | 213 +++++++ .../kylin/tool/ExtendCubeToHybridCLI.java | 261 +++++++++ .../apache/kylin/tool/MetadataCleanupJob.java | 180 ++++++ .../apache/kylin/tool/StorageCleanupJob.java | 364 ++++++++++++ 18 files changed, 1632 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/7358a78a/build/bin/metastore.sh ---------------------------------------------------------------------- diff --git a/build/bin/metastore.sh b/build/bin/metastore.sh index baf3d7a..a7a9e27 100755 --- a/build/bin/metastore.sh +++ b/build/bin/metastore.sh @@ -95,7 +95,7 @@ then elif [ "$1" == "clean" ] then - ${KYLIN_HOME}/bin/kylin.sh org.apache.kylin.engine.mr.steps.MetadataCleanupJob "${@:2}" + ${KYLIN_HOME}/bin/kylin.sh org.apache.kylin.tool.MetadataCleanupJob "${@:2}" else echo "usage: metastore.sh backup" http://git-wip-us.apache.org/repos/asf/kylin/blob/7358a78a/build/smoke-test/smoke-test.sh ---------------------------------------------------------------------- diff --git a/build/smoke-test/smoke-test.sh b/build/smoke-test/smoke-test.sh index c21bd6d..f174b47 100755 --- a/build/smoke-test/smoke-test.sh +++ b/build/smoke-test/smoke-test.sh @@ -72,6 +72,6 @@ cd - # Tear down stage ${KYLIN_HOME}/bin/metastore.sh clean --delete true -${KYLIN_HOME}/bin/kylin.sh org.apache.kylin.storage.hbase.util.StorageCleanupJob --delete true +${KYLIN_HOME}/bin/kylin.sh org.apache.kylin.tool.StorageCleanupJob --delete true ${KYLIN_HOME}/bin/metastore.sh reset ${KYLIN_HOME}/bin/kylin.sh stop http://git-wip-us.apache.org/repos/asf/kylin/blob/7358a78a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MetadataCleanupJob.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MetadataCleanupJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MetadataCleanupJob.java index f2b1d6b..eee2c00 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MetadataCleanupJob.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MetadataCleanupJob.java @@ -43,6 +43,7 @@ import com.google.common.collect.Sets; /** */ +@Deprecated public class MetadataCleanupJob extends AbstractHadoopJob { @SuppressWarnings("static-access") @@ -174,6 +175,8 @@ public class MetadataCleanupJob extends AbstractHadoopJob { } public static void main(String[] args) throws Exception { + logger.warn("org.apache.kylin.engine.mr.steps.MetadataCleanupJob is deprecated, use org.apache.kylin.tool.MetadataCleanupJob instead"); + int exitCode = ToolRunner.run(new MetadataCleanupJob(), args); System.exit(exitCode); } http://git-wip-us.apache.org/repos/asf/kylin/blob/7358a78a/kylin-it/pom.xml ---------------------------------------------------------------------- diff --git a/kylin-it/pom.xml b/kylin-it/pom.xml index 43e47c9..3bde97c 100644 --- a/kylin-it/pom.xml +++ b/kylin-it/pom.xml @@ -61,6 +61,10 @@ </dependency> <dependency> <groupId>org.apache.kylin</groupId> + <artifactId>kylin-tool</artifactId> + </dependency> + <dependency> + <groupId>org.apache.kylin</groupId> <artifactId>kylin-storage-hbase</artifactId> </dependency> <dependency> http://git-wip-us.apache.org/repos/asf/kylin/blob/7358a78a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java ---------------------------------------------------------------------- diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java index 499c6cc..156b1f6 100644 --- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java +++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java @@ -59,8 +59,8 @@ import org.apache.kylin.source.ISource; import org.apache.kylin.source.SourceFactory; import org.apache.kylin.source.SourcePartition; import org.apache.kylin.storage.hbase.util.HBaseRegionSizeCalculator; -import org.apache.kylin.storage.hbase.util.StorageCleanupJob; import org.apache.kylin.storage.hbase.util.ZookeeperJobLock; +import org.apache.kylin.tool.StorageCleanupJob; import org.slf4j.Logger; import org.slf4j.LoggerFactory; http://git-wip-us.apache.org/repos/asf/kylin/blob/7358a78a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java ---------------------------------------------------------------------- diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java index 9fd6c52..9804292 100644 --- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java +++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java @@ -62,8 +62,8 @@ import org.apache.kylin.source.SourcePartition; import org.apache.kylin.source.kafka.KafkaConfigManager; import org.apache.kylin.source.kafka.config.BrokerConfig; import org.apache.kylin.source.kafka.config.KafkaConfig; -import org.apache.kylin.storage.hbase.util.StorageCleanupJob; import org.apache.kylin.storage.hbase.util.ZookeeperJobLock; +import org.apache.kylin.tool.StorageCleanupJob; import org.junit.Assert; import org.slf4j.Logger; import org.slf4j.LoggerFactory; http://git-wip-us.apache.org/repos/asf/kylin/blob/7358a78a/server-base/pom.xml ---------------------------------------------------------------------- diff --git a/server-base/pom.xml b/server-base/pom.xml index 67013e4..56a50ef 100644 --- a/server-base/pom.xml +++ b/server-base/pom.xml @@ -62,6 +62,10 @@ <groupId>org.apache.kylin</groupId> <artifactId>kylin-source-kafka</artifactId> </dependency> + <dependency> + <groupId>org.apache.kylin</groupId> + <artifactId>kylin-tool</artifactId> + </dependency> <dependency> <groupId>net.sf.ehcache</groupId> http://git-wip-us.apache.org/repos/asf/kylin/blob/7358a78a/server-base/src/main/java/org/apache/kylin/rest/service/AdminService.java ---------------------------------------------------------------------- diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/AdminService.java b/server-base/src/main/java/org/apache/kylin/rest/service/AdminService.java index 1a94967..ace0388 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/service/AdminService.java +++ b/server-base/src/main/java/org/apache/kylin/rest/service/AdminService.java @@ -28,7 +28,7 @@ import org.apache.commons.configuration.PropertiesConfiguration; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.rest.constant.Constant; import org.apache.kylin.rest.exception.InternalErrorException; -import org.apache.kylin.storage.hbase.util.StorageCleanupJob; +import org.apache.kylin.tool.StorageCleanupJob; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.security.access.prepost.PreAuthorize; http://git-wip-us.apache.org/repos/asf/kylin/blob/7358a78a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCLI.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCLI.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCLI.java index 52aa7ea..dcf1690 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCLI.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCLI.java @@ -76,6 +76,7 @@ import org.slf4j.LoggerFactory; * Note that different envs are assumed to share the same hadoop cluster, * including hdfs, hbase and hive. */ +@Deprecated public class CubeMigrationCLI { private static final Logger logger = LoggerFactory.getLogger(CubeMigrationCLI.class); @@ -93,6 +94,7 @@ public class CubeMigrationCLI { private static final String ACL_INFO_FAMILY_PARENT_COLUMN = "p"; public static void main(String[] args) throws IOException, InterruptedException { + logger.warn("org.apache.kylin.storage.hbase.util.CubeMigrationCLI is deprecated, use org.apache.kylin.tool.CubeMigrationCLI instead"); if (args.length != 8) { usage(); http://git-wip-us.apache.org/repos/asf/kylin/blob/7358a78a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCheckCLI.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCheckCLI.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCheckCLI.java index 295750a..8bd4abf 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCheckCLI.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCheckCLI.java @@ -49,6 +49,7 @@ import com.google.common.collect.Lists; * for all of cube segments' corresponding HTables after migrating a cube * <p/> */ +@Deprecated public class CubeMigrationCheckCLI { private static final Logger logger = LoggerFactory.getLogger(CubeMigrationCheckCLI.class); @@ -68,6 +69,7 @@ public class CubeMigrationCheckCLI { private boolean ifFix = false; public static void main(String[] args) throws ParseException, IOException { + logger.warn("org.apache.kylin.storage.hbase.util.CubeMigrationCheckCLI is deprecated, use org.apache.kylin.tool.CubeMigrationCheckCLI instead"); OptionsHelper optionsHelper = new OptionsHelper(); http://git-wip-us.apache.org/repos/asf/kylin/blob/7358a78a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ExtendCubeToHybridCLI.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ExtendCubeToHybridCLI.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ExtendCubeToHybridCLI.java index b883df2..8d0cb82 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ExtendCubeToHybridCLI.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ExtendCubeToHybridCLI.java @@ -60,7 +60,9 @@ import com.google.common.collect.Lists; /** * Created by dongli on 12/29/15. + * */ +@Deprecated public class ExtendCubeToHybridCLI { public static final String ACL_INFO_FAMILY = "i"; private static final String CUBE_POSTFIX = "_old"; @@ -83,6 +85,8 @@ public class ExtendCubeToHybridCLI { } public static void main(String[] args) throws Exception { + logger.warn("org.apache.kylin.storage.hbase.util.ExtendCubeToHybridCLI is deprecated, use org.apache.kylin.tool.ExtendCubeToHybridCLI instead"); + if (args.length != 2 && args.length != 3) { System.out.println("Usage: ExtendCubeToHybridCLI project cube [partition_date]"); return; http://git-wip-us.apache.org/repos/asf/kylin/blob/7358a78a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HiveCmdBuilder.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HiveCmdBuilder.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HiveCmdBuilder.java index c435f34..9252254 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HiveCmdBuilder.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HiveCmdBuilder.java @@ -34,6 +34,7 @@ import com.google.common.collect.Lists; /** * Created by dongli on 2/29/16. */ +@Deprecated public class HiveCmdBuilder { private static final Logger logger = LoggerFactory.getLogger(HiveCmdBuilder.class); http://git-wip-us.apache.org/repos/asf/kylin/blob/7358a78a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java index dcd90e9..e66eaec 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java @@ -58,6 +58,7 @@ import org.apache.kylin.metadata.realization.IRealizationConstants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +@Deprecated public class StorageCleanupJob extends AbstractApplication { @SuppressWarnings("static-access") @@ -349,6 +350,8 @@ public class StorageCleanupJob extends AbstractApplication { } public static void main(String[] args) throws Exception { + logger.warn("org.apache.kylin.storage.hbase.util.StorageCleanupJob is deprecated, use org.apache.kylin.tool.StorageCleanupJob instead"); + StorageCleanupJob cli = new StorageCleanupJob(); cli.execute(args); } http://git-wip-us.apache.org/repos/asf/kylin/blob/7358a78a/tool/src/main/java/org/apache/kylin/tool/CubeMigrationCLI.java ---------------------------------------------------------------------- diff --git a/tool/src/main/java/org/apache/kylin/tool/CubeMigrationCLI.java b/tool/src/main/java/org/apache/kylin/tool/CubeMigrationCLI.java new file mode 100644 index 0000000..46f8d75 --- /dev/null +++ b/tool/src/main/java/org/apache/kylin/tool/CubeMigrationCLI.java @@ -0,0 +1,586 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.tool; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.persistence.JsonSerializer; +import org.apache.kylin.common.persistence.RawResource; +import org.apache.kylin.common.persistence.ResourceStore; +import org.apache.kylin.common.persistence.Serializer; +import org.apache.kylin.common.restclient.RestClient; +import org.apache.kylin.common.util.Bytes; +import org.apache.kylin.common.util.Dictionary; +import org.apache.kylin.cube.CubeInstance; +import org.apache.kylin.cube.CubeManager; +import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.cube.model.CubeDesc; +import org.apache.kylin.dict.DictionaryInfo; +import org.apache.kylin.dict.DictionaryManager; +import org.apache.kylin.dict.lookup.SnapshotManager; +import org.apache.kylin.dict.lookup.SnapshotTable; +import org.apache.kylin.engine.mr.JobBuilderSupport; +import org.apache.kylin.metadata.cachesync.Broadcaster; +import org.apache.kylin.metadata.model.DataModelDesc; +import org.apache.kylin.metadata.model.SegmentStatusEnum; +import org.apache.kylin.metadata.model.TableDesc; +import org.apache.kylin.metadata.project.ProjectInstance; +import org.apache.kylin.metadata.realization.IRealizationConstants; +import org.apache.kylin.metadata.realization.RealizationStatusEnum; +import org.apache.kylin.metadata.realization.RealizationType; +import org.apache.kylin.storage.hbase.HBaseConnection; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * <p/> + * This tool serves for the purpose of migrating cubes. e.g. upgrade cube from + * dev env to test(prod) env, or vice versa. + * <p/> + * Note that different envs are assumed to share the same hadoop cluster, + * including hdfs, hbase and hive. + */ +public class CubeMigrationCLI { + + private static final Logger logger = LoggerFactory.getLogger(CubeMigrationCLI.class); + + private static List<Opt> operations; + private static KylinConfig srcConfig; + private static KylinConfig dstConfig; + private static ResourceStore srcStore; + private static ResourceStore dstStore; + private static FileSystem hdfsFS; + private static HBaseAdmin hbaseAdmin; + + public static final String ACL_INFO_FAMILY = "i"; + private static final String ACL_TABLE_NAME = "_acl"; + private static final String ACL_INFO_FAMILY_PARENT_COLUMN = "p"; + + public static void main(String[] args) throws IOException, InterruptedException { + + if (args.length != 8) { + usage(); + System.exit(1); + } + + moveCube(args[0], args[1], args[2], args[3], args[4], args[5], args[6], args[7]); + } + + private static void usage() { + System.out.println("Usage: CubeMigrationCLI srcKylinConfigUri dstKylinConfigUri cubeName projectName copyAclOrNot purgeOrNot overwriteIfExists realExecute"); + System.out.println(" srcKylinConfigUri: The KylinConfig of the cubeâs source \n" + "dstKylinConfigUri: The KylinConfig of the cubeâs new home \n" + "cubeName: the name of cube to be migrated. \n" + "projectName: The target project in the target environment.(Make sure it exist) \n" + "copyAclOrNot: true or false: whether copy cube ACL to target environment. \n" + "purgeOrNot: true or false: whether purge the cube from src server after the migration. \n" + "overwriteIfExists: overwrite cube if it already exists in the target environment. \n" + "realExecute: if false, just print the operations to take, if true, do the real migration. \n"); + + } + + public static void moveCube(KylinConfig srcCfg, KylinConfig dstCfg, String cubeName, String projectName, String copyAcl, String purgeAndDisable, String overwriteIfExists, String realExecute) throws IOException, InterruptedException { + + srcConfig = srcCfg; + srcStore = ResourceStore.getStore(srcConfig); + dstConfig = dstCfg; + dstStore = ResourceStore.getStore(dstConfig); + + CubeManager cubeManager = CubeManager.getInstance(srcConfig); + CubeInstance cube = cubeManager.getCube(cubeName); + logger.info("cube to be moved is : " + cubeName); + + if (cube.getStatus() != RealizationStatusEnum.READY) + throw new IllegalStateException("Cannot migrate cube that is not in READY state."); + + for (CubeSegment segment : cube.getSegments()) { + if (segment.getStatus() != SegmentStatusEnum.READY) { + throw new IllegalStateException("At least one segment is not in READY state"); + } + } + + checkAndGetHbaseUrl(); + + Configuration conf = HBaseConnection.getCurrentHBaseConfiguration(); + hbaseAdmin = new HBaseAdmin(conf); + + hdfsFS = FileSystem.get(new Configuration()); + + operations = new ArrayList<Opt>(); + + copyFilesInMetaStore(cube, overwriteIfExists); + renameFoldersInHdfs(cube); + changeHtableHost(cube); + addCubeAndModelIntoProject(cube, cubeName, projectName); + if (Boolean.parseBoolean(copyAcl) == true) { + copyACL(cube, projectName); + } + + if (Boolean.parseBoolean(purgeAndDisable) == true) { + purgeAndDisable(cubeName); // this should be the last action + } + + if (realExecute.equalsIgnoreCase("true")) { + doOpts(); + checkMigrationSuccess(dstConfig, cubeName, true); + updateMeta(dstConfig); + } else { + showOpts(); + } + } + + public static void moveCube(String srcCfgUri, String dstCfgUri, String cubeName, String projectName, String copyAcl, String purgeAndDisable, String overwriteIfExists, String realExecute) throws IOException, InterruptedException { + + moveCube(KylinConfig.createInstanceFromUri(srcCfgUri), KylinConfig.createInstanceFromUri(dstCfgUri), cubeName, projectName, copyAcl, purgeAndDisable, overwriteIfExists, realExecute); + } + + public static void checkMigrationSuccess(KylinConfig kylinConfig, String cubeName, Boolean ifFix) throws IOException { + CubeMigrationCheckCLI checkCLI = new CubeMigrationCheckCLI(kylinConfig, ifFix); + checkCLI.execute(cubeName); + } + + private static String checkAndGetHbaseUrl() { + String srcMetadataUrl = srcConfig.getMetadataUrl(); + String dstMetadataUrl = dstConfig.getMetadataUrl(); + + logger.info("src metadata url is " + srcMetadataUrl); + logger.info("dst metadata url is " + dstMetadataUrl); + + int srcIndex = srcMetadataUrl.toLowerCase().indexOf("hbase"); + int dstIndex = dstMetadataUrl.toLowerCase().indexOf("hbase"); + if (srcIndex < 0 || dstIndex < 0) + throw new IllegalStateException("Both metadata urls should be hbase metadata url"); + + String srcHbaseUrl = srcMetadataUrl.substring(srcIndex).trim(); + String dstHbaseUrl = dstMetadataUrl.substring(dstIndex).trim(); + if (!srcHbaseUrl.equalsIgnoreCase(dstHbaseUrl)) { + throw new IllegalStateException("hbase url not equal! "); + } + + logger.info("hbase url is " + srcHbaseUrl.trim()); + return srcHbaseUrl.trim(); + } + + private static void renameFoldersInHdfs(CubeInstance cube) { + for (CubeSegment segment : cube.getSegments()) { + + String jobUuid = segment.getLastBuildJobID(); + String src = JobBuilderSupport.getJobWorkingDir(srcConfig.getHdfsWorkingDirectory(), jobUuid); + String tgt = JobBuilderSupport.getJobWorkingDir(dstConfig.getHdfsWorkingDirectory(), jobUuid); + + operations.add(new Opt(OptType.RENAME_FOLDER_IN_HDFS, new Object[] { src, tgt })); + } + + } + + private static void changeHtableHost(CubeInstance cube) { + for (CubeSegment segment : cube.getSegments()) { + operations.add(new Opt(OptType.CHANGE_HTABLE_HOST, new Object[] { segment.getStorageLocationIdentifier() })); + } + } + + private static void copyACL(CubeInstance cube, String projectName) { + operations.add(new Opt(OptType.COPY_ACL, new Object[] { cube.getUuid(), cube.getDescriptor().getModel().getUuid(), projectName })); + } + + private static void copyFilesInMetaStore(CubeInstance cube, String overwriteIfExists) throws IOException { + + List<String> metaItems = new ArrayList<String>(); + Set<String> dictAndSnapshot = new HashSet<String>(); + listCubeRelatedResources(cube, metaItems, dictAndSnapshot); + + if (dstStore.exists(cube.getResourcePath()) && !overwriteIfExists.equalsIgnoreCase("true")) + throw new IllegalStateException("The cube named " + cube.getName() + " already exists on target metadata store. Use overwriteIfExists to overwrite it"); + + for (String item : metaItems) { + operations.add(new Opt(OptType.COPY_FILE_IN_META, new Object[] { item })); + } + + for (String item : dictAndSnapshot) { + operations.add(new Opt(OptType.COPY_DICT_OR_SNAPSHOT, new Object[] { item, cube.getName() })); + } + } + private static void addCubeAndModelIntoProject(CubeInstance srcCube, String cubeName, String projectName) throws IOException { + String projectResPath = ProjectInstance.concatResourcePath(projectName); + if (!dstStore.exists(projectResPath)) + throw new IllegalStateException("The target project " + projectName + "does not exist"); + + operations.add(new Opt(OptType.ADD_INTO_PROJECT, new Object[] { srcCube, cubeName, projectName })); + } + + private static void purgeAndDisable(String cubeName) throws IOException { + operations.add(new Opt(OptType.PURGE_AND_DISABLE, new Object[] { cubeName })); + } + + private static void listCubeRelatedResources(CubeInstance cube, List<String> metaResource, Set<String> dictAndSnapshot) throws IOException { + + CubeDesc cubeDesc = cube.getDescriptor(); + metaResource.add(cube.getResourcePath()); + metaResource.add(cubeDesc.getResourcePath()); + metaResource.add(DataModelDesc.concatResourcePath(cubeDesc.getModelName())); + + for (String table : cubeDesc.getModel().getAllTables()) { + metaResource.add(TableDesc.concatResourcePath(table.toUpperCase())); + } + + for (CubeSegment segment : cube.getSegments()) { + metaResource.add(segment.getStatisticsResourcePath()); + dictAndSnapshot.addAll(segment.getSnapshotPaths()); + dictAndSnapshot.addAll(segment.getDictionaryPaths()); + } + } + + private static enum OptType { + COPY_FILE_IN_META, COPY_DICT_OR_SNAPSHOT, RENAME_FOLDER_IN_HDFS, ADD_INTO_PROJECT, CHANGE_HTABLE_HOST, COPY_ACL, PURGE_AND_DISABLE + } + + private static class Opt { + private OptType type; + private Object[] params; + + private Opt(OptType type, Object[] params) { + this.type = type; + this.params = params; + } + + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append(type).append(":"); + for (Object s : params) + sb.append(s).append(", "); + return sb.toString(); + } + + } + + private static void showOpts() { + for (int i = 0; i < operations.size(); ++i) { + showOpt(operations.get(i)); + } + } + + private static void showOpt(Opt opt) { + logger.info("Operation: " + opt.toString()); + } + + private static void doOpts() throws IOException, InterruptedException { + int index = 0; + try { + for (; index < operations.size(); ++index) { + logger.info("Operation index :" + index); + doOpt(operations.get(index)); + } + } catch (Exception e) { + logger.error("error met", e); + logger.info("Try undoing previous changes"); + // undo: + for (int i = index; i >= 0; --i) { + try { + undo(operations.get(i)); + } catch (Exception ee) { + logger.error("error met ", e); + logger.info("Continue undoing..."); + } + } + + throw new RuntimeException("Cube moving failed"); + } + } + + @SuppressWarnings("checkstyle:methodlength") + private static void doOpt(Opt opt) throws IOException, InterruptedException { + logger.info("Executing operation: " + opt.toString()); + + switch (opt.type) { + case CHANGE_HTABLE_HOST: { + String tableName = (String) opt.params[0]; + HTableDescriptor desc = hbaseAdmin.getTableDescriptor(TableName.valueOf(tableName)); + hbaseAdmin.disableTable(tableName); + desc.setValue(IRealizationConstants.HTableTag, dstConfig.getMetadataUrlPrefix()); + hbaseAdmin.modifyTable(tableName, desc); + hbaseAdmin.enableTable(tableName); + logger.info("CHANGE_HTABLE_HOST is completed"); + break; + } + case COPY_FILE_IN_META: { + String item = (String) opt.params[0]; + RawResource res = srcStore.getResource(item); + dstStore.putResource(item, res.inputStream, res.timestamp); + res.inputStream.close(); + logger.info("Item " + item + " is copied"); + break; + } + case COPY_DICT_OR_SNAPSHOT: { + String item = (String) opt.params[0]; + + if (item.toLowerCase().endsWith(".dict")) { + DictionaryManager dstDictMgr = DictionaryManager.getInstance(dstConfig); + DictionaryManager srcDicMgr = DictionaryManager.getInstance(srcConfig); + DictionaryInfo dictSrc = srcDicMgr.getDictionaryInfo(item); + + long ts = dictSrc.getLastModified(); + dictSrc.setLastModified(0);//to avoid resource store write conflict + Dictionary dictObj = dictSrc.getDictionaryObject().copyToAnotherMeta(srcConfig, dstConfig); + DictionaryInfo dictSaved = dstDictMgr.trySaveNewDict(dictObj, dictSrc); + dictSrc.setLastModified(ts); + + if (dictSaved == dictSrc) { + //no dup found, already saved to dest + logger.info("Item " + item + " is copied"); + } else { + //dictSrc is rejected because of duplication + //modify cube's dictionary path + String cubeName = (String) opt.params[1]; + String cubeResPath = CubeInstance.concatResourcePath(cubeName); + Serializer<CubeInstance> cubeSerializer = new JsonSerializer<CubeInstance>(CubeInstance.class); + CubeInstance cube = dstStore.getResource(cubeResPath, CubeInstance.class, cubeSerializer); + for (CubeSegment segment : cube.getSegments()) { + for (Map.Entry<String, String> entry : segment.getDictionaries().entrySet()) { + if (entry.getValue().equalsIgnoreCase(item)) { + entry.setValue(dictSaved.getResourcePath()); + } + } + } + dstStore.putResource(cubeResPath, cube, cubeSerializer); + logger.info("Item " + item + " is dup, instead " + dictSaved.getResourcePath() + " is reused"); + } + + } else if (item.toLowerCase().endsWith(".snapshot")) { + SnapshotManager dstSnapMgr = SnapshotManager.getInstance(dstConfig); + SnapshotManager srcSnapMgr = SnapshotManager.getInstance(srcConfig); + SnapshotTable snapSrc = srcSnapMgr.getSnapshotTable(item); + + long ts = snapSrc.getLastModified(); + snapSrc.setLastModified(0); + SnapshotTable snapSaved = dstSnapMgr.trySaveNewSnapshot(snapSrc); + snapSrc.setLastModified(ts); + + if (snapSaved == snapSrc) { + //no dup found, already saved to dest + logger.info("Item " + item + " is copied"); + + } else { + String cubeName = (String) opt.params[1]; + String cubeResPath = CubeInstance.concatResourcePath(cubeName); + Serializer<CubeInstance> cubeSerializer = new JsonSerializer<CubeInstance>(CubeInstance.class); + CubeInstance cube = dstStore.getResource(cubeResPath, CubeInstance.class, cubeSerializer); + for (CubeSegment segment : cube.getSegments()) { + for (Map.Entry<String, String> entry : segment.getSnapshots().entrySet()) { + if (entry.getValue().equalsIgnoreCase(item)) { + entry.setValue(snapSaved.getResourcePath()); + } + } + } + dstStore.putResource(cubeResPath, cube, cubeSerializer); + logger.info("Item " + item + " is dup, instead " + snapSaved.getResourcePath() + " is reused"); + + } + + } else { + logger.error("unknown item found: " + item); + logger.info("ignore it"); + } + + break; + } + case RENAME_FOLDER_IN_HDFS: { + String srcPath = (String) opt.params[0]; + String dstPath = (String) opt.params[1]; + renameHDFSPath(srcPath, dstPath); + logger.info("HDFS Folder renamed from " + srcPath + " to " + dstPath); + break; + } + case ADD_INTO_PROJECT: { + CubeInstance srcCube = (CubeInstance) opt.params[0]; + String cubeName = (String) opt.params[1]; + String projectName = (String) opt.params[2]; + String modelName = srcCube.getDescriptor().getModelName(); + + String projectResPath = ProjectInstance.concatResourcePath(projectName); + Serializer<ProjectInstance> projectSerializer = new JsonSerializer<ProjectInstance>(ProjectInstance.class); + ProjectInstance project = dstStore.getResource(projectResPath, ProjectInstance.class, projectSerializer); + + project.addModel(modelName); + project.removeRealization(RealizationType.CUBE, cubeName); + project.addRealizationEntry(RealizationType.CUBE, cubeName); + + dstStore.putResource(projectResPath, project, projectSerializer); + logger.info("Project instance for " + projectName + " is corrected"); + break; + } + case COPY_ACL: { + String cubeId = (String) opt.params[0]; + String modelId = (String) opt.params[1]; + String projectName = (String) opt.params[2]; + String projectResPath = ProjectInstance.concatResourcePath(projectName); + Serializer<ProjectInstance> projectSerializer = new JsonSerializer<ProjectInstance>(ProjectInstance.class); + ProjectInstance project = dstStore.getResource(projectResPath, ProjectInstance.class, projectSerializer); + String projUUID = project.getUuid(); + HTableInterface srcAclHtable = null; + HTableInterface destAclHtable = null; + try { + srcAclHtable = HBaseConnection.get(srcConfig.getStorageUrl()).getTable(srcConfig.getMetadataUrlPrefix() + ACL_TABLE_NAME); + destAclHtable = HBaseConnection.get(dstConfig.getStorageUrl()).getTable(dstConfig.getMetadataUrlPrefix() + ACL_TABLE_NAME); + + // cube acl + Result result = srcAclHtable.get(new Get(Bytes.toBytes(cubeId))); + if (result.listCells() != null) { + for (Cell cell : result.listCells()) { + byte[] family = CellUtil.cloneFamily(cell); + byte[] column = CellUtil.cloneQualifier(cell); + byte[] value = CellUtil.cloneValue(cell); + + // use the target project uuid as the parent + if (Bytes.toString(family).equals(ACL_INFO_FAMILY) && Bytes.toString(column).equals(ACL_INFO_FAMILY_PARENT_COLUMN)) { + String valueString = "{\"id\":\"" + projUUID + "\",\"type\":\"org.apache.kylin.metadata.project.ProjectInstance\"}"; + value = Bytes.toBytes(valueString); + } + Put put = new Put(Bytes.toBytes(cubeId)); + put.add(family, column, value); + destAclHtable.put(put); + } + } + destAclHtable.flushCommits(); + } finally { + IOUtils.closeQuietly(srcAclHtable); + IOUtils.closeQuietly(destAclHtable); + } + break; + } + case PURGE_AND_DISABLE: { + String cubeName = (String) opt.params[0]; + String cubeResPath = CubeInstance.concatResourcePath(cubeName); + Serializer<CubeInstance> cubeSerializer = new JsonSerializer<CubeInstance>(CubeInstance.class); + CubeInstance cube = srcStore.getResource(cubeResPath, CubeInstance.class, cubeSerializer); + cube.getSegments().clear(); + cube.setStatus(RealizationStatusEnum.DISABLED); + srcStore.putResource(cubeResPath, cube, cubeSerializer); + logger.info("Cube " + cubeName + " is purged and disabled in " + srcConfig.getMetadataUrl()); + + break; + } + default: { + //do nothing + break; + } + } + } + + private static void undo(Opt opt) throws IOException, InterruptedException { + logger.info("Undo operation: " + opt.toString()); + + switch (opt.type) { + case CHANGE_HTABLE_HOST: { + String tableName = (String) opt.params[0]; + HTableDescriptor desc = hbaseAdmin.getTableDescriptor(TableName.valueOf(tableName)); + hbaseAdmin.disableTable(tableName); + desc.setValue(IRealizationConstants.HTableTag, srcConfig.getMetadataUrlPrefix()); + hbaseAdmin.modifyTable(tableName, desc); + hbaseAdmin.enableTable(tableName); + break; + } + case COPY_FILE_IN_META: { + // no harm + logger.info("Undo for COPY_FILE_IN_META is ignored"); + break; + } + case COPY_DICT_OR_SNAPSHOT: { + // no harm + logger.info("Undo for COPY_DICT_OR_SNAPSHOT is ignored"); + break; + } + case RENAME_FOLDER_IN_HDFS: { + String srcPath = (String) opt.params[1]; + String dstPath = (String) opt.params[0]; + + if (hdfsFS.exists(new Path(srcPath)) && !hdfsFS.exists(new Path(dstPath))) { + renameHDFSPath(srcPath, dstPath); + logger.info("HDFS Folder renamed from " + srcPath + " to " + dstPath); + } + break; + } + case ADD_INTO_PROJECT: { + logger.info("Undo for ADD_INTO_PROJECT is ignored"); + break; + } + case COPY_ACL: { + String cubeId = (String) opt.params[0]; + String modelId = (String) opt.params[1]; + HTableInterface destAclHtable = null; + try { + destAclHtable = HBaseConnection.get(dstConfig.getStorageUrl()).getTable(dstConfig.getMetadataUrlPrefix() + ACL_TABLE_NAME); + + destAclHtable.delete(new Delete(Bytes.toBytes(cubeId))); + destAclHtable.delete(new Delete(Bytes.toBytes(modelId))); + destAclHtable.flushCommits(); + } finally { + IOUtils.closeQuietly(destAclHtable); + } + break; + } + case PURGE_AND_DISABLE: { + logger.info("Undo for PURGE_AND_DISABLE is not supported"); + break; + } + default: { + //do nothing + break; + } + } + } + + private static void updateMeta(KylinConfig config){ + String[] nodes = config.getRestServers(); + for (String node : nodes) { + RestClient restClient = new RestClient(node); + try { + logger.info("update meta cache for " + node); + restClient.wipeCache(Broadcaster.SYNC_ALL, Broadcaster.Event.UPDATE.getType(), Broadcaster.SYNC_ALL); + } catch (IOException e) { + logger.error(e.getMessage()); + } + } + } + + private static void renameHDFSPath(String srcPath, String dstPath) throws IOException, InterruptedException { + int nRetry = 0; + int sleepTime = 5000; + while (!hdfsFS.rename(new Path(srcPath), new Path(dstPath))) { + ++nRetry; + if (nRetry > 3) { + throw new InterruptedException("Cannot rename folder " + srcPath + " to folder " + dstPath); + } else { + Thread.sleep(sleepTime * nRetry * nRetry); + } + } + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/7358a78a/tool/src/main/java/org/apache/kylin/tool/CubeMigrationCheckCLI.java ---------------------------------------------------------------------- diff --git a/tool/src/main/java/org/apache/kylin/tool/CubeMigrationCheckCLI.java b/tool/src/main/java/org/apache/kylin/tool/CubeMigrationCheckCLI.java new file mode 100644 index 0000000..fe348ba --- /dev/null +++ b/tool/src/main/java/org/apache/kylin/tool/CubeMigrationCheckCLI.java @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.tool; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; + +import org.apache.commons.cli.Option; +import org.apache.commons.cli.OptionBuilder; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.ParseException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.OptionsHelper; +import org.apache.kylin.cube.CubeInstance; +import org.apache.kylin.cube.CubeManager; +import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.metadata.realization.IRealizationConstants; +import org.apache.kylin.storage.hbase.HBaseConnection; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.Lists; + +/** + * <p/> + * This tool serves for the purpose of + * checking the "KYLIN_HOST" property to be consistent with the dst's MetadataUrlPrefix + * for all of cube segments' corresponding HTables after migrating a cube + * <p/> + */ +public class CubeMigrationCheckCLI { + + private static final Logger logger = LoggerFactory.getLogger(CubeMigrationCheckCLI.class); + + private static final Option OPTION_FIX = OptionBuilder.withArgName("fix").hasArg().isRequired(false).withDescription("Fix the inconsistent cube segments' HOST").create("fix"); + + private static final Option OPTION_DST_CFG_URI = OptionBuilder.withArgName("dstCfgUri").hasArg().isRequired(false).withDescription("The KylinConfig of the cubeâs new home").create("dstCfgUri"); + + private static final Option OPTION_CUBE = OptionBuilder.withArgName("cube").hasArg().isRequired(false).withDescription("The name of cube migrated").create("cube"); + + private KylinConfig dstCfg; + private HBaseAdmin hbaseAdmin; + + private List<String> issueExistHTables; + private List<String> inconsistentHTables; + + private boolean ifFix = false; + + public static void main(String[] args) throws ParseException, IOException { + + OptionsHelper optionsHelper = new OptionsHelper(); + + Options options = new Options(); + options.addOption(OPTION_FIX); + options.addOption(OPTION_DST_CFG_URI); + options.addOption(OPTION_CUBE); + + boolean ifFix = false; + String dstCfgUri; + String cubeName; + logger.info("jobs args: " + Arrays.toString(args)); + try { + + optionsHelper.parseOptions(options, args); + + logger.info("options: '" + options.toString() + "'"); + logger.info("option value 'fix': '" + optionsHelper.getOptionValue(OPTION_FIX) + "'"); + ifFix = Boolean.parseBoolean(optionsHelper.getOptionValue(OPTION_FIX)); + + logger.info("option value 'dstCfgUri': '" + optionsHelper.getOptionValue(OPTION_DST_CFG_URI) + "'"); + dstCfgUri = optionsHelper.getOptionValue(OPTION_DST_CFG_URI); + + logger.info("option value 'cube': '" + optionsHelper.getOptionValue(OPTION_CUBE) + "'"); + cubeName = optionsHelper.getOptionValue(OPTION_CUBE); + + } catch (ParseException e) { + optionsHelper.printUsage(CubeMigrationCheckCLI.class.getName(), options); + throw e; + } + + KylinConfig kylinConfig; + if (dstCfgUri == null) { + kylinConfig = KylinConfig.getInstanceFromEnv(); + } else { + kylinConfig = KylinConfig.createInstanceFromUri(dstCfgUri); + } + + CubeMigrationCheckCLI checkCLI = new CubeMigrationCheckCLI(kylinConfig, ifFix); + checkCLI.execute(cubeName); + } + + public void execute() throws IOException { + execute(null); + } + + public void execute(String cubeName) throws IOException { + if (cubeName == null) { + checkAll(); + } else { + checkCube(cubeName); + } + fixInconsistent(); + printIssueExistingHTables(); + } + + public CubeMigrationCheckCLI(KylinConfig kylinConfig, Boolean isFix) throws IOException { + this.dstCfg = kylinConfig; + this.ifFix = isFix; + + Configuration conf = HBaseConnection.getCurrentHBaseConfiguration(); + hbaseAdmin = new HBaseAdmin(conf); + + issueExistHTables = Lists.newArrayList(); + inconsistentHTables = Lists.newArrayList(); + } + + public void checkCube(String cubeName) { + List<String> segFullNameList = Lists.newArrayList(); + + CubeInstance cube = CubeManager.getInstance(dstCfg).getCube(cubeName); + addHTableNamesForCube(cube, segFullNameList); + + check(segFullNameList); + } + + public void checkAll() { + List<String> segFullNameList = Lists.newArrayList(); + + CubeManager cubeMgr = CubeManager.getInstance(dstCfg); + for (CubeInstance cube : cubeMgr.listAllCubes()) { + addHTableNamesForCube(cube, segFullNameList); + } + + check(segFullNameList); + } + + public void addHTableNamesForCube(CubeInstance cube, List<String> segFullNameList) { + for (CubeSegment seg : cube.getSegments()) { + String tableName = seg.getStorageLocationIdentifier(); + segFullNameList.add(tableName + "," + cube.getName()); + } + } + + public void check(List<String> segFullNameList) { + issueExistHTables = Lists.newArrayList(); + inconsistentHTables = Lists.newArrayList(); + + for (String segFullName : segFullNameList) { + String[] sepNameList = segFullName.split(","); + try { + HTableDescriptor hTableDescriptor = hbaseAdmin.getTableDescriptor(TableName.valueOf(sepNameList[0])); + String host = hTableDescriptor.getValue(IRealizationConstants.HTableTag); + if (!dstCfg.getMetadataUrlPrefix().equalsIgnoreCase(host)) { + inconsistentHTables.add(segFullName); + } + } catch (IOException e) { + issueExistHTables.add(segFullName); + continue; + } + } + } + + public void fixInconsistent() throws IOException { + if (ifFix == true) { + for (String segFullName : inconsistentHTables) { + String[] sepNameList = segFullName.split(","); + HTableDescriptor desc = hbaseAdmin.getTableDescriptor(TableName.valueOf(sepNameList[0])); + logger.info("Change the host of htable " + sepNameList[0] + "belonging to cube " + sepNameList[1] + " from " + desc.getValue(IRealizationConstants.HTableTag) + " to " + dstCfg.getMetadataUrlPrefix()); + hbaseAdmin.disableTable(sepNameList[0]); + desc.setValue(IRealizationConstants.HTableTag, dstCfg.getMetadataUrlPrefix()); + hbaseAdmin.modifyTable(sepNameList[0], desc); + hbaseAdmin.enableTable(sepNameList[0]); + } + } else { + logger.info("------ Inconsistent HTables Needed To Be Fixed ------"); + for (String hTable : inconsistentHTables) { + String[] sepNameList = hTable.split(","); + logger.info(sepNameList[0] + " belonging to cube " + sepNameList[1]); + } + logger.info("----------------------------------------------------"); + } + } + + public void printIssueExistingHTables() { + logger.info("------ HTables exist issues in hbase : not existing, metadata broken ------"); + for (String segFullName : issueExistHTables) { + String[] sepNameList = segFullName.split(","); + logger.error(sepNameList[0] + " belonging to cube " + sepNameList[1] + " has some issues and cannot be read successfully!!!"); + } + logger.info("----------------------------------------------------"); + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/7358a78a/tool/src/main/java/org/apache/kylin/tool/ExtendCubeToHybridCLI.java ---------------------------------------------------------------------- diff --git a/tool/src/main/java/org/apache/kylin/tool/ExtendCubeToHybridCLI.java b/tool/src/main/java/org/apache/kylin/tool/ExtendCubeToHybridCLI.java new file mode 100644 index 0000000..27fa973 --- /dev/null +++ b/tool/src/main/java/org/apache/kylin/tool/ExtendCubeToHybridCLI.java @@ -0,0 +1,261 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.tool; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.persistence.JsonSerializer; +import org.apache.kylin.common.persistence.ResourceStore; +import org.apache.kylin.common.persistence.Serializer; +import org.apache.kylin.common.util.Bytes; +import org.apache.kylin.common.util.DateFormat; +import org.apache.kylin.cube.CubeDescManager; +import org.apache.kylin.cube.CubeInstance; +import org.apache.kylin.cube.CubeManager; +import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.cube.model.CubeDesc; +import org.apache.kylin.metadata.MetadataManager; +import org.apache.kylin.metadata.model.DataModelDesc; +import org.apache.kylin.metadata.model.IEngineAware; +import org.apache.kylin.metadata.model.IStorageAware; +import org.apache.kylin.metadata.project.ProjectInstance; +import org.apache.kylin.metadata.project.ProjectManager; +import org.apache.kylin.metadata.project.RealizationEntry; +import org.apache.kylin.metadata.realization.RealizationStatusEnum; +import org.apache.kylin.metadata.realization.RealizationType; +import org.apache.kylin.storage.hbase.HBaseConnection; +import org.apache.kylin.storage.hybrid.HybridInstance; +import org.apache.kylin.storage.hybrid.HybridManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.Lists; + +/** + * Created by dongli on 12/29/15. + */ +public class ExtendCubeToHybridCLI { + public static final String ACL_INFO_FAMILY = "i"; + private static final String CUBE_POSTFIX = "_old"; + private static final String HYBRID_POSTFIX = "_hybrid"; + private static final Logger logger = LoggerFactory.getLogger(ExtendCubeToHybridCLI.class); + private static final String ACL_INFO_FAMILY_PARENT_COLUMN = "p"; + + private KylinConfig kylinConfig; + private CubeManager cubeManager; + private CubeDescManager cubeDescManager; + private MetadataManager metadataManager; + private ResourceStore store; + + public ExtendCubeToHybridCLI() { + this.kylinConfig = KylinConfig.getInstanceFromEnv(); + this.store = ResourceStore.getStore(kylinConfig); + this.cubeManager = CubeManager.getInstance(kylinConfig); + this.cubeDescManager = CubeDescManager.getInstance(kylinConfig); + this.metadataManager = MetadataManager.getInstance(kylinConfig); + } + + public static void main(String[] args) throws Exception { + if (args.length != 2 && args.length != 3) { + System.out.println("Usage: ExtendCubeToHybridCLI project cube [partition_date]"); + return; + } + + ExtendCubeToHybridCLI tool = new ExtendCubeToHybridCLI(); + + String projectName = args[0]; + String cubeName = args[1]; + String partitionDate = args.length == 3 ? args[2] : null; + + try { + tool.createFromCube(projectName, cubeName, partitionDate); + tool.verify(); + logger.info("Job Finished."); + } catch (Exception e) { + e.printStackTrace(); + logger.error("Job Aborted.", e.getMessage()); + } + } + + private boolean validateCubeInstance(CubeInstance cubeInstance) { + if (cubeInstance == null) { + logger.error("This cube does not exist."); + return false; + } + if (cubeInstance.getSegments().isEmpty()) { + logger.error("No segments in this cube, no need to extend."); + return false; + } + return true; + } + + public void createFromCube(String projectName, String cubeName, String partitionDateStr) throws Exception { + logger.info("Create hybrid for cube[" + cubeName + "], project[" + projectName + "], partition_date[" + partitionDateStr + "]."); + + CubeInstance cubeInstance = cubeManager.getCube(cubeName); + if (!validateCubeInstance(cubeInstance)) { + return; + } + + CubeDesc cubeDesc = cubeDescManager.getCubeDesc(cubeInstance.getDescName()); + DataModelDesc dataModelDesc = metadataManager.getDataModelDesc(cubeDesc.getModelName()); + if (StringUtils.isEmpty(dataModelDesc.getPartitionDesc().getPartitionDateColumn())) { + logger.error("No incremental cube, no need to extend."); + return; + } + + String owner = cubeInstance.getOwner(); + long partitionDate = partitionDateStr != null ? DateFormat.stringToMillis(partitionDateStr) : 0; + + // get new name for old cube and cube_desc + String newCubeDescName = renameCube(cubeDesc.getName()); + String newCubeInstanceName = renameCube(cubeInstance.getName()); + while (cubeDescManager.getCubeDesc(newCubeDescName) != null) + newCubeDescName = renameCube(newCubeDescName); + while (cubeManager.getCube(newCubeInstanceName) != null) + newCubeInstanceName = renameCube(newCubeInstanceName); + + // create new cube_instance for old segments + CubeInstance newCubeInstance = CubeInstance.getCopyOf(cubeInstance); + newCubeInstance.setName(newCubeInstanceName); + newCubeInstance.setDescName(newCubeDescName); + newCubeInstance.updateRandomUuid(); + Iterator<CubeSegment> segmentIterator = newCubeInstance.getSegments().iterator(); + CubeSegment currentSeg = null; + while (segmentIterator.hasNext()) { + currentSeg = segmentIterator.next(); + if (partitionDateStr != null && (currentSeg.getDateRangeStart() >= partitionDate || currentSeg.getDateRangeEnd() > partitionDate)) { + segmentIterator.remove(); + logger.info("CubeSegment[" + currentSeg + "] was removed."); + } + } + if (partitionDateStr != null && partitionDate != currentSeg.getDateRangeEnd()) { + logger.error("PartitionDate must be end date of one segment."); + return; + } + if (currentSeg != null && partitionDateStr == null) + partitionDate = currentSeg.getDateRangeEnd(); + + cubeManager.createCube(newCubeInstance, projectName, owner); + logger.info("CubeInstance was saved at: " + newCubeInstance.getResourcePath()); + + // create new cube for old segments + CubeDesc newCubeDesc = CubeDesc.getCopyOf(cubeDesc); + newCubeDesc.setName(newCubeDescName); + newCubeDesc.updateRandomUuid(); + newCubeDesc.init(kylinConfig, metadataManager.getAllTablesMap()); + newCubeDesc.setPartitionDateEnd(partitionDate); + newCubeDesc.calculateSignature(); + cubeDescManager.createCubeDesc(newCubeDesc); + logger.info("CubeDesc was saved at: " + newCubeDesc.getResourcePath()); + + // update old cube_desc to new-version metadata + cubeDesc.setPartitionDateStart(partitionDate); + cubeDesc.setEngineType(IEngineAware.ID_MR_V2); + cubeDesc.setStorageType(IStorageAware.ID_SHARDED_HBASE); + cubeDesc.calculateSignature(); + cubeDescManager.updateCubeDesc(cubeDesc); + logger.info("CubeDesc was saved at: " + cubeDesc.getResourcePath()); + + // clear segments for old cube + cubeInstance.setSegments(new ArrayList<CubeSegment>()); + cubeInstance.setStatus(RealizationStatusEnum.DISABLED); + store.putResource(cubeInstance.getResourcePath(), cubeInstance, CubeManager.CUBE_SERIALIZER); + logger.info("CubeInstance was saved at: " + cubeInstance.getResourcePath()); + + // create hybrid model for these two cubes + List<RealizationEntry> realizationEntries = Lists.newArrayListWithCapacity(2); + realizationEntries.add(RealizationEntry.create(RealizationType.CUBE, cubeInstance.getName())); + realizationEntries.add(RealizationEntry.create(RealizationType.CUBE, newCubeInstance.getName())); + HybridInstance hybridInstance = HybridInstance.create(kylinConfig, renameHybrid(cubeInstance.getName()), realizationEntries); + store.putResource(hybridInstance.getResourcePath(), hybridInstance, HybridManager.HYBRID_SERIALIZER); + ProjectManager.getInstance(kylinConfig).moveRealizationToProject(RealizationType.HYBRID, hybridInstance.getName(), projectName, owner); + logger.info("HybridInstance was saved at: " + hybridInstance.getResourcePath()); + + // copy Acl from old cube to new cube + copyAcl(cubeInstance.getId(), newCubeInstance.getId(), projectName); + logger.info("Acl copied from [" + cubeName + "] to [" + newCubeInstanceName + "]."); + } + + private void verify() { + CubeDescManager.clearCache(); + CubeDescManager.getInstance(kylinConfig); + + CubeManager.clearCache(); + CubeManager.getInstance(kylinConfig); + + ProjectManager.clearCache(); + ProjectManager.getInstance(kylinConfig); + + HybridManager.clearCache(); + HybridManager.getInstance(kylinConfig); + } + + private String renameCube(String origName) { + return origName + CUBE_POSTFIX; + } + + private String renameHybrid(String origName) { + return origName + HYBRID_POSTFIX; + } + + private void copyAcl(String origCubeId, String newCubeId, String projectName) throws Exception { + String projectResPath = ProjectInstance.concatResourcePath(projectName); + Serializer<ProjectInstance> projectSerializer = new JsonSerializer<ProjectInstance>(ProjectInstance.class); + ProjectInstance project = store.getResource(projectResPath, ProjectInstance.class, projectSerializer); + String projUUID = project.getUuid(); + HTableInterface aclHtable = null; + try { + aclHtable = HBaseConnection.get(kylinConfig.getStorageUrl()).getTable(kylinConfig.getMetadataUrlPrefix() + "_acl"); + + // cube acl + Result result = aclHtable.get(new Get(Bytes.toBytes(origCubeId))); + if (result.listCells() != null) { + for (Cell cell : result.listCells()) { + byte[] family = CellUtil.cloneFamily(cell); + byte[] column = CellUtil.cloneQualifier(cell); + byte[] value = CellUtil.cloneValue(cell); + + // use the target project uuid as the parent + if (Bytes.toString(family).equals(ACL_INFO_FAMILY) && Bytes.toString(column).equals(ACL_INFO_FAMILY_PARENT_COLUMN)) { + String valueString = "{\"id\":\"" + projUUID + "\",\"type\":\"org.apache.kylin.metadata.project.ProjectInstance\"}"; + value = Bytes.toBytes(valueString); + } + Put put = new Put(Bytes.toBytes(newCubeId)); + put.add(family, column, value); + aclHtable.put(put); + } + } + aclHtable.flushCommits(); + } finally { + IOUtils.closeQuietly(aclHtable); + } + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/7358a78a/tool/src/main/java/org/apache/kylin/tool/MetadataCleanupJob.java ---------------------------------------------------------------------- diff --git a/tool/src/main/java/org/apache/kylin/tool/MetadataCleanupJob.java b/tool/src/main/java/org/apache/kylin/tool/MetadataCleanupJob.java new file mode 100644 index 0000000..94962ff --- /dev/null +++ b/tool/src/main/java/org/apache/kylin/tool/MetadataCleanupJob.java @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.tool; + +import java.util.Arrays; +import java.util.List; +import java.util.NavigableSet; +import java.util.Set; + +import org.apache.commons.cli.Option; +import org.apache.commons.cli.OptionBuilder; +import org.apache.commons.cli.Options; +import org.apache.hadoop.util.ToolRunner; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.persistence.ResourceStore; +import org.apache.kylin.cube.CubeManager; +import org.apache.kylin.engine.mr.common.AbstractHadoopJob; +import org.apache.kylin.job.dao.ExecutableDao; +import org.apache.kylin.job.dao.ExecutableOutputPO; +import org.apache.kylin.job.dao.ExecutablePO; +import org.apache.kylin.job.execution.ExecutableState; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; + +/** + */ +public class MetadataCleanupJob extends AbstractHadoopJob { + + @SuppressWarnings("static-access") + private static final Option OPTION_DELETE = OptionBuilder.withArgName("delete").hasArg().isRequired(false).withDescription("Delete the unused metadata").create("delete"); + + protected static final Logger logger = LoggerFactory.getLogger(MetadataCleanupJob.class); + + boolean delete = false; + + private KylinConfig config = null; + + public static final long TIME_THREADSHOLD = 2 * 24 * 3600 * 1000L; // 2 days + public static final long TIME_THREADSHOLD_FOR_JOB = 30 * 24 * 3600 * 1000L; // 30 days + + /* + * (non-Javadoc) + * + * @see org.apache.hadoop.util.Tool#run(java.lang.String[]) + */ + @Override + public int run(String[] args) throws Exception { + Options options = new Options(); + + logger.info("jobs args: " + Arrays.toString(args)); + options.addOption(OPTION_DELETE); + parseOptions(options, args); + + logger.info("options: '" + getOptionsAsString() + "'"); + logger.info("delete option value: '" + getOptionValue(OPTION_DELETE) + "'"); + delete = Boolean.parseBoolean(getOptionValue(OPTION_DELETE)); + + config = KylinConfig.getInstanceFromEnv(); + + cleanup(); + + return 0; + } + + private ResourceStore getStore() { + return ResourceStore.getStore(config); + } + + private boolean isOlderThanThreshold(long resourceTime) { + long currentTime = System.currentTimeMillis(); + + if (currentTime - resourceTime > TIME_THREADSHOLD) + return true; + return false; + } + + public void cleanup() throws Exception { + CubeManager cubeManager = CubeManager.getInstance(config); + + List<String> toDeleteResource = Lists.newArrayList(); + + // two level resources, snapshot tables and cube statistics + for (String resourceRoot : new String[] { ResourceStore.SNAPSHOT_RESOURCE_ROOT, ResourceStore.CUBE_STATISTICS_ROOT }) { + NavigableSet<String> snapshotTables = getStore().listResources(resourceRoot); + + if (snapshotTables != null) { + for (String snapshotTable : snapshotTables) { + NavigableSet<String> snapshotNames = getStore().listResources(snapshotTable); + if (snapshotNames != null) + for (String snapshot : snapshotNames) { + if (isOlderThanThreshold(getStore().getResourceTimestamp(snapshot))) + toDeleteResource.add(snapshot); + } + } + } + } + + // three level resources, only dictionaries + NavigableSet<String> dictTables = getStore().listResources(ResourceStore.DICT_RESOURCE_ROOT); + + if (dictTables != null) { + for (String table : dictTables) { + NavigableSet<String> tableColNames = getStore().listResources(table); + if (tableColNames != null) + for (String tableCol : tableColNames) { + NavigableSet<String> dictionaries = getStore().listResources(tableCol); + if (dictionaries != null) + for (String dict : dictionaries) + if (isOlderThanThreshold(getStore().getResourceTimestamp(dict))) + toDeleteResource.add(dict); + } + } + } + + Set<String> activeResourceList = Sets.newHashSet(); + for (org.apache.kylin.cube.CubeInstance cube : cubeManager.listAllCubes()) { + for (org.apache.kylin.cube.CubeSegment segment : cube.getSegments()) { + activeResourceList.addAll(segment.getSnapshotPaths()); + activeResourceList.addAll(segment.getDictionaryPaths()); + activeResourceList.add(segment.getStatisticsResourcePath()); + } + } + + toDeleteResource.removeAll(activeResourceList); + + // delete old and completed jobs + ExecutableDao executableDao = ExecutableDao.getInstance(KylinConfig.getInstanceFromEnv()); + List<ExecutablePO> allExecutable = executableDao.getJobs(); + for (ExecutablePO executable : allExecutable) { + long lastModified = executable.getLastModified(); + ExecutableOutputPO output = executableDao.getJobOutput(executable.getUuid()); + if (System.currentTimeMillis() - lastModified > TIME_THREADSHOLD_FOR_JOB && (ExecutableState.SUCCEED.toString().equals(output.getStatus()) || ExecutableState.DISCARDED.toString().equals(output.getStatus()))) { + toDeleteResource.add(ResourceStore.EXECUTE_RESOURCE_ROOT + "/" + executable.getUuid()); + toDeleteResource.add(ResourceStore.EXECUTE_OUTPUT_RESOURCE_ROOT + "/" + executable.getUuid()); + + for (ExecutablePO task : executable.getTasks()) { + toDeleteResource.add(ResourceStore.EXECUTE_OUTPUT_RESOURCE_ROOT + "/" + task.getUuid()); + } + } + } + + if (toDeleteResource.size() > 0) { + logger.info("The following resources have no reference or is too old, will be cleaned from metadata store: \n"); + + for (String s : toDeleteResource) { + logger.info(s); + if (delete == true) { + getStore().deleteResource(s); + } + } + } else { + logger.info("No resource to be cleaned up from metadata store;"); + } + + } + + public static void main(String[] args) throws Exception { + int exitCode = ToolRunner.run(new MetadataCleanupJob(), args); + System.exit(exitCode); + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/7358a78a/tool/src/main/java/org/apache/kylin/tool/StorageCleanupJob.java ---------------------------------------------------------------------- diff --git a/tool/src/main/java/org/apache/kylin/tool/StorageCleanupJob.java b/tool/src/main/java/org/apache/kylin/tool/StorageCleanupJob.java new file mode 100644 index 0000000..c1ff753 --- /dev/null +++ b/tool/src/main/java/org/apache/kylin/tool/StorageCleanupJob.java @@ -0,0 +1,364 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.tool; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.StringReader; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.FutureTask; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.regex.Pattern; + +import org.apache.commons.cli.Option; +import org.apache.commons.cli.OptionBuilder; +import org.apache.commons.cli.Options; +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.AbstractApplication; +import org.apache.kylin.common.util.CliCommandExecutor; +import org.apache.kylin.common.util.OptionsHelper; +import org.apache.kylin.common.util.Pair; +import org.apache.kylin.cube.CubeInstance; +import org.apache.kylin.cube.CubeManager; +import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.engine.mr.JobBuilderSupport; +import org.apache.kylin.job.engine.JobEngineConfig; +import org.apache.kylin.job.execution.AbstractExecutable; +import org.apache.kylin.job.execution.ExecutableState; +import org.apache.kylin.job.manager.ExecutableManager; +import org.apache.kylin.metadata.realization.IRealizationConstants; +import org.apache.kylin.storage.hbase.util.HiveCmdBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class StorageCleanupJob extends AbstractApplication { + + @SuppressWarnings("static-access") + protected static final Option OPTION_DELETE = OptionBuilder.withArgName("delete").hasArg().isRequired(false).withDescription("Delete the unused storage").create("delete"); + protected static final Option OPTION_FORCE = OptionBuilder.withArgName("force").hasArg().isRequired(false).withDescription("Warning: will delete any intermediate hive tables").create("force"); + + protected static final Logger logger = LoggerFactory.getLogger(StorageCleanupJob.class); + public static final int TIME_THRESHOLD_DELETE_HTABLE = 10; // Unit minute + + protected boolean delete = false; + protected boolean force = false; + protected static ExecutableManager executableManager = ExecutableManager.getInstance(KylinConfig.getInstanceFromEnv()); + + private void cleanUnusedHBaseTables(Configuration conf) throws IOException { + CubeManager cubeMgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()); + long TIME_THREADSHOLD = KylinConfig.getInstanceFromEnv().getStorageCleanupTimeThreshold(); + // get all kylin hbase tables + HBaseAdmin hbaseAdmin = new HBaseAdmin(conf); + String tableNamePrefix = IRealizationConstants.SharedHbaseStorageLocationPrefix; + HTableDescriptor[] tableDescriptors = hbaseAdmin.listTables(tableNamePrefix + ".*"); + List<String> allTablesNeedToBeDropped = new ArrayList<String>(); + for (HTableDescriptor desc : tableDescriptors) { + String host = desc.getValue(IRealizationConstants.HTableTag); + String creationTime = desc.getValue(IRealizationConstants.HTableCreationTime); + if (KylinConfig.getInstanceFromEnv().getMetadataUrlPrefix().equalsIgnoreCase(host)) { + //only take care htables that belongs to self, and created more than 2 days + if (StringUtils.isEmpty(creationTime) || (System.currentTimeMillis() - Long.valueOf(creationTime) > TIME_THREADSHOLD)) { + allTablesNeedToBeDropped.add(desc.getTableName().getNameAsString()); + } else { + logger.info("Exclude table " + desc.getTableName().getNameAsString() + " from drop list, as it is newly created"); + } + } + } + + // remove every segment htable from drop list + for (CubeInstance cube : cubeMgr.listAllCubes()) { + for (CubeSegment seg : cube.getSegments()) { + String tablename = seg.getStorageLocationIdentifier(); + if (allTablesNeedToBeDropped.contains(tablename)) { + allTablesNeedToBeDropped.remove(tablename); + logger.info("Exclude table " + tablename + " from drop list, as the table belongs to cube " + cube.getName() + " with status " + cube.getStatus()); + } + } + } + + if (delete == true) { + // drop tables + ExecutorService executorService = Executors.newSingleThreadExecutor(); + for (String htableName : allTablesNeedToBeDropped) { + FutureTask futureTask = new FutureTask(new DeleteHTableRunnable(hbaseAdmin, htableName)); + executorService.execute(futureTask); + try { + futureTask.get(TIME_THRESHOLD_DELETE_HTABLE, TimeUnit.MINUTES); + } catch (TimeoutException e) { + logger.warn("It fails to delete htable " + htableName + ", for it cost more than " + TIME_THRESHOLD_DELETE_HTABLE + " minutes!"); + futureTask.cancel(true); + } catch (Exception e) { + e.printStackTrace(); + futureTask.cancel(true); + } + } + executorService.shutdown(); + } else { + System.out.println("--------------- Tables To Be Dropped ---------------"); + for (String htableName : allTablesNeedToBeDropped) { + System.out.println(htableName); + } + System.out.println("----------------------------------------------------"); + } + + hbaseAdmin.close(); + } + + @Override + protected Options getOptions() { + Options options = new Options(); + options.addOption(OPTION_DELETE); + options.addOption(OPTION_FORCE); + return options; + } + + @Override + protected void execute(OptionsHelper optionsHelper) throws Exception { + logger.info("options: '" + optionsHelper.getOptionsAsString() + "'"); + logger.info("delete option value: '" + optionsHelper.getOptionValue(OPTION_DELETE) + "'"); + logger.info("force option value: '" + optionsHelper.getOptionValue(OPTION_FORCE) + "'"); + delete = Boolean.parseBoolean(optionsHelper.getOptionValue(OPTION_DELETE)); + force = Boolean.parseBoolean(optionsHelper.getOptionValue(OPTION_FORCE)); + + Configuration conf = HBaseConfiguration.create(); + + cleanUnusedIntermediateHiveTable(conf); + cleanUnusedHdfsFiles(conf); + cleanUnusedHBaseTables(conf); + + } + + class DeleteHTableRunnable implements Callable { + HBaseAdmin hbaseAdmin; + String htableName; + + DeleteHTableRunnable(HBaseAdmin hbaseAdmin, String htableName) { + this.hbaseAdmin = hbaseAdmin; + this.htableName = htableName; + } + + public Object call() throws Exception { + logger.info("Deleting HBase table " + htableName); + if (hbaseAdmin.tableExists(htableName)) { + if (hbaseAdmin.isTableEnabled(htableName)) { + hbaseAdmin.disableTable(htableName); + } + + hbaseAdmin.deleteTable(htableName); + logger.info("Deleted HBase table " + htableName); + } else { + logger.info("HBase table" + htableName + " does not exist"); + } + return null; + } + } + + private void cleanUnusedHdfsFiles(Configuration conf) throws IOException { + JobEngineConfig engineConfig = new JobEngineConfig(KylinConfig.getInstanceFromEnv()); + CubeManager cubeMgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()); + + FileSystem fs = FileSystem.get(conf); + List<String> allHdfsPathsNeedToBeDeleted = new ArrayList<String>(); + // GlobFilter filter = new + // GlobFilter(KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory() + // + "/kylin-.*"); + // TODO: when first use, /kylin/kylin_metadata does not exist. + FileStatus[] fStatus = fs.listStatus(new Path(KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory())); + for (FileStatus status : fStatus) { + String path = status.getPath().getName(); + // System.out.println(path); + if (path.startsWith("kylin-")) { + String kylinJobPath = engineConfig.getHdfsWorkingDirectory() + path; + allHdfsPathsNeedToBeDeleted.add(kylinJobPath); + } + } + + List<String> allJobs = executableManager.getAllJobIds(); + for (String jobId : allJobs) { + // only remove FINISHED and DISCARDED job intermediate files + final ExecutableState state = executableManager.getOutput(jobId).getState(); + if (!state.isFinalState()) { + String path = JobBuilderSupport.getJobWorkingDir(engineConfig.getHdfsWorkingDirectory(), jobId); + allHdfsPathsNeedToBeDeleted.remove(path); + logger.info("Skip " + path + " from deletion list, as the path belongs to job " + jobId + " with status " + state); + } + } + + // remove every segment working dir from deletion list + for (CubeInstance cube : cubeMgr.listAllCubes()) { + for (CubeSegment seg : cube.getSegments()) { + String jobUuid = seg.getLastBuildJobID(); + if (jobUuid != null && jobUuid.equals("") == false) { + String path = JobBuilderSupport.getJobWorkingDir(engineConfig.getHdfsWorkingDirectory(), jobUuid); + allHdfsPathsNeedToBeDeleted.remove(path); + logger.info("Skip " + path + " from deletion list, as the path belongs to segment " + seg + " of cube " + cube.getName()); + } + } + } + + if (delete == true) { + // remove files + for (String hdfsPath : allHdfsPathsNeedToBeDeleted) { + logger.info("Deleting hdfs path " + hdfsPath); + Path p = new Path(hdfsPath); + if (fs.exists(p) == true) { + fs.delete(p, true); + logger.info("Deleted hdfs path " + hdfsPath); + } else { + logger.info("Hdfs path " + hdfsPath + "does not exist"); + } + } + } else { + System.out.println("--------------- HDFS Path To Be Deleted ---------------"); + for (String hdfsPath : allHdfsPathsNeedToBeDeleted) { + System.out.println(hdfsPath); + } + System.out.println("-------------------------------------------------------"); + } + + } + + private void cleanUnusedIntermediateHiveTable(Configuration conf) throws IOException { + final KylinConfig config = KylinConfig.getInstanceFromEnv(); + final CliCommandExecutor cmdExec = config.getCliCommandExecutor(); + final int uuidLength = 36; + final String preFix = "kylin_intermediate_"; + final String uuidPattern = "[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}"; + + + final String useDatabaseHql = "USE " + config.getHiveDatabaseForIntermediateTable() + ";"; + final HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder(); + hiveCmdBuilder.addStatement(useDatabaseHql); + hiveCmdBuilder.addStatement("show tables " + "\'kylin_intermediate_*\'" + "; "); + + Pair<Integer, String> result = cmdExec.execute(hiveCmdBuilder.build()); + + String outputStr = result.getSecond(); + BufferedReader reader = new BufferedReader(new StringReader(outputStr)); + String line = null; + List<String> allJobs = executableManager.getAllJobIds(); + List<String> allHiveTablesNeedToBeDeleted = new ArrayList<String>(); + List<String> workingJobList = new ArrayList<String>(); + + StringBuilder sb = new StringBuilder(); + for (String jobId : allJobs) { + // only remove FINISHED and DISCARDED job intermediate table + final ExecutableState state = executableManager.getOutput(jobId).getState(); + if (!state.isFinalState()) { + workingJobList.add(jobId); + sb.append(jobId).append("(").append(state).append("), "); + } + } + logger.info("Working jobIDs: " + workingJobList); + + while ((line = reader.readLine()) != null) { + + logger.info("Checking table " + line); + + if (!line.startsWith(preFix)) + continue; + + if (force == true) { + logger.warn("!!!!!!!!!!!!!!!Warning: will delete all intermediate hive tables!!!!!!!!!!!!!!!!!!!!!!"); + allHiveTablesNeedToBeDeleted.add(line); + continue; + } + + boolean isNeedDel = true; + + if (line.length() > preFix.length() + uuidLength) { + String uuid = line.substring(line.length() - uuidLength, line.length()); + uuid = uuid.replace("_", "-"); + final Pattern UUId_PATTERN = Pattern.compile(uuidPattern); + if (UUId_PATTERN.matcher(uuid).matches()) { + //Check whether it's a hive table in use + if (isTableInUse(uuid, workingJobList)) { + logger.info("Skip because not isTableInUse"); + isNeedDel = false; + } + } else { + logger.info("Skip because not match pattern"); + isNeedDel = false; + } + } else { + logger.info("Skip because length not qualified"); + isNeedDel = false; + } + + if (isNeedDel) { + allHiveTablesNeedToBeDeleted.add(line); + } + } + + if (delete == true) { + hiveCmdBuilder.reset(); + hiveCmdBuilder.addStatement(useDatabaseHql); + for (String delHive : allHiveTablesNeedToBeDeleted) { + hiveCmdBuilder.addStatement("drop table if exists " + delHive + "; "); + logger.info("Remove " + delHive + " from hive tables."); + } + + try { + cmdExec.execute(hiveCmdBuilder.build()); + } catch (IOException e) { + e.printStackTrace(); + } + } else { + System.out.println("------ Intermediate Hive Tables To Be Dropped ------"); + for (String hiveTable : allHiveTablesNeedToBeDeleted) { + System.out.println(hiveTable); + } + System.out.println("----------------------------------------------------"); + } + + if (reader != null) + reader.close(); + } + + private boolean isTableInUse(String segUuid, List<String> workingJobList) { + for (String jobId : workingJobList) { + AbstractExecutable abstractExecutable = executableManager.getJob(jobId); + String segmentId = abstractExecutable.getParam("segmentId"); + + if (null == segmentId) + continue; + + return segUuid.equals(segmentId); + } + return false; + } + + public static void main(String[] args) throws Exception { + StorageCleanupJob cli = new StorageCleanupJob(); + cli.execute(args); + } +}