KYLIN-2908 Add one option for migration tool to indicate whether to migrate segment data
Signed-off-by: Zhong <nju_y...@apache.org> Signed-off-by: lidongsjtu <lid...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/c704f7cd Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/c704f7cd Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/c704f7cd Branch: refs/heads/sync Commit: c704f7cda79921c5a22645492a52c94734847541 Parents: fee5730 Author: Wang Vic <jwan...@ebay.com> Authored: Wed Sep 27 11:18:22 2017 +0800 Committer: lidongsjtu <lid...@apache.org> Committed: Tue Jan 30 19:10:51 2018 +0800 ---------------------------------------------------------------------- .../org/apache/kylin/cube/CubeInstance.java | 6 + .../org/apache/kylin/tool/CubeMigrationCLI.java | 114 +++++++++++++------ 2 files changed, 87 insertions(+), 33 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/c704f7cd/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java index 27918bc..ea5006e 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java @@ -383,6 +383,12 @@ public class CubeInstance extends RootPersistentEntity implements IRealization, this.createTimeUTC = createTimeUTC; } + public void clearCuboids() { + cuboidBytes = null; + cuboidBytesRecommend = null; + cuboidLastOptimized = 0L; + } + public Set<Long> getCuboidsByMode(String cuboidModeName) { return getCuboidsByMode(cuboidModeName == null ? null : CuboidModeEnum.getByModeName(cuboidModeName)); } http://git-wip-us.apache.org/repos/asf/kylin/blob/c704f7cd/tool/src/main/java/org/apache/kylin/tool/CubeMigrationCLI.java ---------------------------------------------------------------------- diff --git a/tool/src/main/java/org/apache/kylin/tool/CubeMigrationCLI.java b/tool/src/main/java/org/apache/kylin/tool/CubeMigrationCLI.java index f95139e..a4a6ab9 100644 --- a/tool/src/main/java/org/apache/kylin/tool/CubeMigrationCLI.java +++ b/tool/src/main/java/org/apache/kylin/tool/CubeMigrationCLI.java @@ -90,6 +90,7 @@ public class CubeMigrationCLI extends AbstractApplication { private HBaseAdmin hbaseAdmin; protected boolean doAclCopy = false; protected boolean doOverwrite = false; + protected boolean doMigrateSegment = true; protected String dstProject; private static final String ACL_PREFIX = "/acl/"; @@ -97,16 +98,20 @@ public class CubeMigrationCLI extends AbstractApplication { public static void main(String[] args) throws IOException, InterruptedException { CubeMigrationCLI cli = new CubeMigrationCLI(); - if (args.length != 8) { + if (args.length != 8 && args.length != 9) { cli.usage(); System.exit(1); } - cli.moveCube(args[0], args[1], args[2], args[3], args[4], args[5], args[6], args[7]); + if (args.length == 8) { + cli.moveCube(args[0], args[1], args[2], args[3], args[4], args[5], args[6], args[7]); + } else if (args.length == 9) { + cli.moveCube(args[0], args[1], args[2], args[3], args[4], args[5], args[6], args[7], args[8]); + } } protected void usage() { System.out.println( - "Usage: CubeMigrationCLI srcKylinConfigUri dstKylinConfigUri cubeName projectName copyAclOrNot purgeOrNot overwriteIfExists realExecute"); + "Usage: CubeMigrationCLI srcKylinConfigUri dstKylinConfigUri cubeName projectName copyAclOrNot purgeOrNot overwriteIfExists realExecute migrateSegmentOrNot"); System.out.println("srcKylinConfigUri: The KylinConfig of the cubeâs source \n" + "dstKylinConfigUri: The KylinConfig of the cubeâs new home \n" + "cubeName: the name of cube to be migrated. \n" @@ -114,16 +119,36 @@ public class CubeMigrationCLI extends AbstractApplication { + "copyAclOrNot: true or false: whether copy cube ACL to target environment. \n" + "purgeOrNot: true or false: whether purge the cube from src server after the migration. \n" + "overwriteIfExists: overwrite cube if it already exists in the target environment. \n" - + "realExecute: if false, just print the operations to take, if true, do the real migration. \n"); + + "realExecute: if false, just print the operations to take, if true, do the real migration. \n" + + "migrateSegmentOrNot:(optional) true or false: whether copy segment data to target environment. \n"); } - public void moveCube(KylinConfig srcCfg, KylinConfig dstCfg, String cubeName, String projectName, String copyAcl, + public void moveCube(String srcCfgUri, String dstCfgUri, String cubeName, String projectName, String copyAcl, String purgeAndDisable, String overwriteIfExists, String realExecute) throws IOException, InterruptedException { - doAclCopy = Boolean.parseBoolean(copyAcl); - doOverwrite = Boolean.parseBoolean(overwriteIfExists); + moveCube(KylinConfig.createInstanceFromUri(srcCfgUri), KylinConfig.createInstanceFromUri(dstCfgUri), cubeName, + projectName, Boolean.parseBoolean(copyAcl), Boolean.parseBoolean(purgeAndDisable), + Boolean.parseBoolean(overwriteIfExists), Boolean.parseBoolean(realExecute), true); + } + + public void moveCube(String srcCfgUri, String dstCfgUri, String cubeName, String projectName, String copyAcl, + String purgeAndDisable, String overwriteIfExists, String realExecute, String migrateSegment) + throws IOException, InterruptedException { + + moveCube(KylinConfig.createInstanceFromUri(srcCfgUri), KylinConfig.createInstanceFromUri(dstCfgUri), cubeName, + projectName, Boolean.parseBoolean(copyAcl), Boolean.parseBoolean(purgeAndDisable), + Boolean.parseBoolean(overwriteIfExists), Boolean.parseBoolean(realExecute), + Boolean.parseBoolean(migrateSegment)); + } + + public void moveCube(KylinConfig srcCfg, KylinConfig dstCfg, String cubeName, String projectName, boolean copyAcl, + boolean purgeAndDisable, boolean overwriteIfExists, boolean realExecute, boolean migrateSegment) + throws IOException, InterruptedException { + doAclCopy = copyAcl; + doOverwrite = overwriteIfExists; + doMigrateSegment = migrateSegment; srcConfig = srcCfg; srcStore = ResourceStore.getStore(srcConfig); dstConfig = dstCfg; @@ -134,7 +159,10 @@ public class CubeMigrationCLI extends AbstractApplication { CubeInstance cube = cubeManager.getCube(cubeName); logger.info("cube to be moved is : " + cubeName); - checkCubeState(cube); + if (migrateSegment) { + checkCubeState(cube); + } + checkAndGetHbaseUrl(); Configuration conf = HBaseConnection.getCurrentHBaseConfiguration(); @@ -142,31 +170,29 @@ public class CubeMigrationCLI extends AbstractApplication { hdfsFS = HadoopUtil.getWorkingFileSystem(); operations = new ArrayList<Opt>(); copyFilesInMetaStore(cube); - renameFoldersInHdfs(cube); - changeHtableHost(cube); + if (migrateSegment) { + renameFoldersInHdfs(cube); + changeHtableHost(cube); + } else { + clearSegments(cubeName); // this should be after copyFilesInMetaStore + } addCubeAndModelIntoProject(cube, cubeName); - if (Boolean.parseBoolean(purgeAndDisable) == true) { + if (migrateSegment && purgeAndDisable) { purgeAndDisable(cubeName); // this should be the last action } - if (Boolean.parseBoolean(realExecute) == true) { + if (realExecute) { doOpts(); - checkMigrationSuccess(dstConfig, cubeName, true); + if (migrateSegment) { + checkMigrationSuccess(dstConfig, cubeName, true); + } updateMeta(dstConfig); } else { showOpts(); } } - public void moveCube(String srcCfgUri, String dstCfgUri, String cubeName, String projectName, String copyAcl, - String purgeAndDisable, String overwriteIfExists, String realExecute) - throws IOException, InterruptedException { - - moveCube(KylinConfig.createInstanceFromUri(srcCfgUri), KylinConfig.createInstanceFromUri(dstCfgUri), cubeName, - projectName, copyAcl, purgeAndDisable, overwriteIfExists, realExecute); - } - public void checkMigrationSuccess(KylinConfig kylinConfig, String cubeName, Boolean ifFix) throws IOException { CubeMigrationCheckCLI checkCLI = new CubeMigrationCheckCLI(kylinConfig, ifFix); checkCLI.execute(cubeName); @@ -215,22 +241,28 @@ public class CubeMigrationCLI extends AbstractApplication { } } - protected void copyFilesInMetaStore(CubeInstance cube) throws IOException { + protected void clearSegments(String cubeName) throws IOException { + operations.add(new Opt(OptType.CLEAR_SEGMENTS, new Object[] { cubeName })); + } - List<String> metaItems = new ArrayList<String>(); - Set<String> dictAndSnapshot = new HashSet<String>(); - listCubeRelatedResources(cube, metaItems, dictAndSnapshot); + protected void copyFilesInMetaStore(CubeInstance cube) throws IOException { if (dstStore.exists(cube.getResourcePath()) && !doOverwrite) throw new IllegalStateException("The cube named " + cube.getName() + " already exists on target metadata store. Use overwriteIfExists to overwrite it"); + List<String> metaItems = new ArrayList<String>(); + Set<String> dictAndSnapshot = new HashSet<String>(); + listCubeRelatedResources(cube, metaItems, dictAndSnapshot); + for (String item : metaItems) { operations.add(new Opt(OptType.COPY_FILE_IN_META, new Object[] { item })); } - for (String item : dictAndSnapshot) { - operations.add(new Opt(OptType.COPY_DICT_OR_SNAPSHOT, new Object[] { item, cube.getName() })); + if (doMigrateSegment) { + for (String item : dictAndSnapshot) { + operations.add(new Opt(OptType.COPY_DICT_OR_SNAPSHOT, new Object[] { item, cube.getName() })); + } } } @@ -285,10 +317,12 @@ public class CubeMigrationCLI extends AbstractApplication { metaResource.addAll(getCompatibleTablePath(tblRefs, prj, ResourceStore.TABLE_RESOURCE_ROOT)); metaResource.addAll(getCompatibleTablePath(tblRefs, prj, ResourceStore.TABLE_EXD_RESOURCE_ROOT)); - for (CubeSegment segment : cube.getSegments()) { - metaResource.add(segment.getStatisticsResourcePath()); - dictAndSnapshot.addAll(segment.getSnapshotPaths()); - dictAndSnapshot.addAll(segment.getDictionaryPaths()); + if (doMigrateSegment) { + for (CubeSegment segment : cube.getSegments()) { + metaResource.add(segment.getStatisticsResourcePath()); + dictAndSnapshot.addAll(segment.getSnapshotPaths()); + dictAndSnapshot.addAll(segment.getDictionaryPaths()); + } } if (doAclCopy) { @@ -308,7 +342,7 @@ public class CubeMigrationCLI extends AbstractApplication { } protected enum OptType { - COPY_FILE_IN_META, COPY_DICT_OR_SNAPSHOT, RENAME_FOLDER_IN_HDFS, ADD_INTO_PROJECT, CHANGE_HTABLE_HOST, PURGE_AND_DISABLE + COPY_FILE_IN_META, COPY_DICT_OR_SNAPSHOT, RENAME_FOLDER_IN_HDFS, ADD_INTO_PROJECT, CHANGE_HTABLE_HOST, PURGE_AND_DISABLE, CLEAR_SEGMENTS } protected void addOpt(OptType type, Object[] params) { @@ -490,7 +524,7 @@ public class CubeMigrationCLI extends AbstractApplication { project.addTable(tableRef.getTableIdentity()); } - if (project.getModels().contains(modelName) == false) + if (!project.getModels().contains(modelName)) project.addModel(modelName); project.removeRealization(RealizationType.CUBE, cubeName); project.addRealizationEntry(RealizationType.CUBE, cubeName); @@ -499,6 +533,20 @@ public class CubeMigrationCLI extends AbstractApplication { logger.info("Project instance for " + projectName + " is corrected"); break; } + case CLEAR_SEGMENTS: { + String cubeName = (String) opt.params[0]; + String cubeInstancePath = CubeInstance.concatResourcePath(cubeName); + Serializer<CubeInstance> cubeInstanceSerializer = new JsonSerializer<CubeInstance>(CubeInstance.class); + CubeInstance cubeInstance = dstStore.getResource(cubeInstancePath, CubeInstance.class, + cubeInstanceSerializer); + cubeInstance.getSegments().clear(); + cubeInstance.clearCuboids(); + cubeInstance.setCreateTimeUTC(System.currentTimeMillis()); + cubeInstance.setStatus(RealizationStatusEnum.DISABLED); + dstStore.putResource(cubeInstancePath, cubeInstance, cubeInstanceSerializer); + logger.info("Cleared segments for " + cubeName + ", since segments has not been copied"); + break; + } case PURGE_AND_DISABLE: { String cubeName = (String) opt.params[0]; String cubeResPath = CubeInstance.concatResourcePath(cubeName);