This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch kylin-on-parquet-v2 in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/kylin-on-parquet-v2 by this push: new e6ca98d KYLIN-4923 CubeMigration Tools support migrate meta from 2.x/3.x cluster to 4.0 cluster e6ca98d is described below commit e6ca98d9e047efee4e6505e55f5aca573c9c0052 Author: zhengshengjun <shengjun_zh...@sina.com> AuthorDate: Mon Mar 8 16:52:50 2021 +0800 KYLIN-4923 CubeMigration Tools support migrate meta from 2.x/3.x cluster to 4.0 cluster --- .../org/apache/kylin/tool/CubeMigrationCLI.java | 274 +++++++++++++++------ 1 file changed, 197 insertions(+), 77 deletions(-) diff --git a/tool/src/main/java/org/apache/kylin/tool/CubeMigrationCLI.java b/tool/src/main/java/org/apache/kylin/tool/CubeMigrationCLI.java index 8a18f25..fd978a3 100644 --- a/tool/src/main/java/org/apache/kylin/tool/CubeMigrationCLI.java +++ b/tool/src/main/java/org/apache/kylin/tool/CubeMigrationCLI.java @@ -18,26 +18,38 @@ package org.apache.kylin.tool; +import java.io.DataOutputStream; import java.io.File; import java.io.IOException; +import java.io.ByteArrayOutputStream; +import java.io.ByteArrayInputStream; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.Locale; +import java.util.Arrays; + +import org.apache.commons.cli.Option; +import org.apache.commons.cli.OptionBuilder; +import org.apache.commons.cli.OptionGroup; import org.apache.commons.cli.Options; +import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.KylinVersion; import org.apache.kylin.common.StorageURL; import org.apache.kylin.common.persistence.JsonSerializer; import org.apache.kylin.common.persistence.RawResource; import org.apache.kylin.common.persistence.ResourceStore; import org.apache.kylin.common.persistence.Serializer; +import org.apache.kylin.common.persistence.ContentReader; import org.apache.kylin.common.restclient.RestClient; import org.apache.kylin.common.util.AbstractApplication; import org.apache.kylin.common.util.HadoopUtil; @@ -55,7 +67,10 @@ import org.apache.kylin.metadata.model.TableDesc; import org.apache.kylin.metadata.model.TableRef; import org.apache.kylin.metadata.model.TableExtDesc; import org.apache.kylin.metadata.model.DataModelManager; +import org.apache.kylin.metadata.model.IStorageAware; +import org.apache.kylin.metadata.model.IEngineAware; import org.apache.kylin.metadata.project.ProjectInstance; +import org.apache.kylin.metadata.project.ProjectManager; import org.apache.kylin.metadata.realization.RealizationStatusEnum; import org.apache.kylin.metadata.realization.RealizationType; import org.slf4j.Logger; @@ -82,110 +97,161 @@ public class CubeMigrationCLI extends AbstractApplication { protected Configuration conf; protected boolean doAclCopy = false; protected boolean doOverwrite = false; - protected boolean doMigrateSegment = true; + protected boolean doMigrateSegment = false; protected String dstProject; protected String srcHdfsWorkDir; protected String dstHdfsWorkDir; + private boolean realExecute; + private boolean purgeAndDisable; + private OptionsHelper optHelper; private static final String ACL_PREFIX = "/acl/"; private static final String GLOBAL_DICT_PREFIX = "/dict/global_dict/"; - public static void main(String[] args) throws IOException, InterruptedException { + private static final Option OPTION_SRC_CONFIG = OptionBuilder.isRequired(true).hasArg().withDescription("The KylinConfig of the cube’s source").create("srcConfig"); + private static final Option OPTION_DST_CONFIG = OptionBuilder.isRequired(true).hasArg().withDescription("The KylinConfig of the cube’s new home").create("dstConfig"); + private static final Option OPTION_ALL_CUBES = OptionBuilder.isRequired(false).withDescription("migrate all cubes meta from source cluster").create("allCubes"); + private static final Option OPTION_CUBE = OptionBuilder.isRequired(false).hasArg().withDescription("Cube name to migrate").create("cube"); + private static final Option OPTION_DST_PROJECT = OptionBuilder.isRequired(false).hasArg().withDescription("cube's new project home, if not set, keep the same as source cluster").create("dstProject"); + private static final Option OPTION_SRC_PROJECT = OptionBuilder.isRequired(false).hasArg().withDescription("source project to migrate").create("srcProject"); + private static final Option OPTION_COPY_ACL = OptionBuilder.isRequired(false).hasArg().withDescription("copy ACL").create("copyAcl"); + private static final Option OPTION_PURGE_AND_DISABLE = OptionBuilder.isRequired(false).withDescription("purge source cluster data").create("purgeAndDisable"); + private static final Option OPTION_OVERWRITE = OptionBuilder.isRequired(false).withDescription("overwrite target cluster's meta if exists").create("overwriteIfExists"); + private static final Option OPTION_EXECUTE = OptionBuilder.isRequired(false).hasArg().withDescription("execute migration").create("realMigrate"); + private static final Option OPTION_MIGRATE_SEGMENTS = OptionBuilder.isRequired(false).withDescription("migrate segment data").create("migrateSegment"); + + public static void main(String[] args) throws Exception { + CubeMigrationCLI cli = new CubeMigrationCLI(); + cli.init(args); + cli.moveCube(); + } + public void init(String[] args) { + optHelper = new OptionsHelper(); CubeMigrationCLI cli = new CubeMigrationCLI(); - if (args.length != 8 && args.length != 9) { - cli.usage(); + try { + optHelper.parseOptions(cli.getOptions(), args); + } catch (Exception e) { + logger.error("failed to parse arguments", e); + optHelper.printUsage("CubeMigrationCLI", cli.getOptions()); System.exit(1); } - if (args.length == 8) { - cli.moveCube(args[0], args[1], args[2], args[3], args[4], args[5], args[6], args[7]); - } else if (args.length == 9) { - cli.moveCube(args[0], args[1], args[2], args[3], args[4], args[5], args[6], args[7], args[8]); - } + doAclCopy = optHelper.hasOption(OPTION_COPY_ACL); + doOverwrite = optHelper.hasOption(OPTION_OVERWRITE); + doMigrateSegment = optHelper.hasOption(OPTION_MIGRATE_SEGMENTS); + purgeAndDisable = optHelper.hasOption(OPTION_PURGE_AND_DISABLE); + srcConfig = KylinConfig.createInstanceFromUri(optHelper.getOptionValue(OPTION_SRC_CONFIG)); + srcStore = ResourceStore.getStore(srcConfig); + dstConfig = KylinConfig.createInstanceFromUri(optHelper.getOptionValue(OPTION_DST_CONFIG)); + dstStore = ResourceStore.getStore(dstConfig); + realExecute = optHelper.hasOption(OPTION_EXECUTE) ? Boolean.valueOf(optHelper.getOptionValue(OPTION_EXECUTE)) : true; + dstProject = optHelper.getOptionValue(OPTION_DST_PROJECT); + conf = HadoopUtil.getCurrentConfiguration(); } - protected void usage() { - System.out.println( - "Usage: CubeMigrationCLI srcKylinConfigUri dstKylinConfigUri cubeName projectName copyAclOrNot purgeOrNot overwriteIfExists realExecute migrateSegmentOrNot"); - System.out.println("srcKylinConfigUri: The KylinConfig of the cube’s source \n" - + "dstKylinConfigUri: The KylinConfig of the cube’s new home \n" - + "cubeName: the name of cube to be migrated. \n" - + "projectName: The target project in the target environment.(Make sure it exist) \n" - + "copyAclOrNot: true or false: whether copy cube ACL to target environment. \n" - + "purgeOrNot: true or false: whether purge the cube from src server after the migration. \n" - + "overwriteIfExists: overwrite cube if it already exists in the target environment. \n" - + "realExecute: if false, just print the operations to take, if true, do the real migration. \n" - + "migrateSegmentOrNot:(optional) true or false: whether copy segment data to target environment. \n"); - - } + public void moveCube() throws Exception { + conf = HadoopUtil.getCurrentConfiguration(); - public void moveCube(String srcCfgUri, String dstCfgUri, String cubeName, String projectName, String copyAcl, - String purgeAndDisable, String overwriteIfExists, String realExecute) - throws IOException, InterruptedException { + if (optHelper.hasOption(OPTION_CUBE)) { + CubeManager cubeManager = CubeManager.getInstance(srcConfig); + moveSingleCube(optHelper.getOptionValue(OPTION_CUBE), dstProject, cubeManager); + } else if (optHelper.hasOption(OPTION_SRC_PROJECT)) { + moveAllCubesUnderProject(optHelper.getOptionValue(OPTION_SRC_PROJECT), ProjectManager.getInstance(srcConfig), ProjectManager.getInstance(dstConfig)); + } else if (optHelper.hasOption(OPTION_ALL_CUBES)) { + moveAllProjectAndCubes(); + } - moveCube(KylinConfig.createInstanceFromUri(srcCfgUri), KylinConfig.createInstanceFromUri(dstCfgUri), cubeName, - projectName, copyAcl, purgeAndDisable, overwriteIfExists, realExecute); } - public void moveCube(KylinConfig srcCfg, KylinConfig dstCfg, String cubeName, String projectName, String copyAcl, - String purgeAndDisable, String overwriteIfExists, String realExecute) - throws IOException, InterruptedException { + private void moveAllCubesUnderProject(String srcProject, ProjectManager srcProjectManager, ProjectManager dstProjectManager) throws Exception { + ProjectInstance srcProjectInstance = srcProjectManager.getProject(srcProject); - moveCube(srcCfg, dstCfg, cubeName, projectName, Boolean.parseBoolean(copyAcl), - Boolean.parseBoolean(purgeAndDisable), Boolean.parseBoolean(overwriteIfExists), - Boolean.parseBoolean(realExecute), true); - } + if (StringUtils.isEmpty(dstProject)) { + if (null == dstProjectManager.getProject(srcProject)) { + dstProjectManager.createProject(srcProjectInstance.getName(), + srcProjectInstance.getOwner(), srcProjectInstance.getDescription(), srcProjectInstance.getOverrideKylinProps()); + } + } - public void moveCube(String srcCfgUri, String dstCfgUri, String cubeName, String projectName, String copyAcl, - String purgeAndDisable, String overwriteIfExists, String realExecute, String migrateSegment) - throws IOException, InterruptedException { + CubeManager cubeManager = CubeManager.getInstance(srcConfig); + + srcProjectInstance.getRealizationEntries(RealizationType.CUBE).forEach(cube -> { + try { + if (StringUtils.isEmpty(dstProject)) { + moveSingleCube(cube.getRealization(), srcProject, cubeManager); + } else { + moveSingleCube(cube.getRealization(), dstProject, cubeManager); + } + } catch (Exception e) { + logger.error("failed to move cube: {}", cube.getRealization(), e); + } + }); - moveCube(KylinConfig.createInstanceFromUri(srcCfgUri), KylinConfig.createInstanceFromUri(dstCfgUri), cubeName, - projectName, Boolean.parseBoolean(copyAcl), Boolean.parseBoolean(purgeAndDisable), - Boolean.parseBoolean(overwriteIfExists), Boolean.parseBoolean(realExecute), - Boolean.parseBoolean(migrateSegment)); } - public void moveCube(KylinConfig srcCfg, KylinConfig dstCfg, String cubeName, String projectName, boolean copyAcl, - boolean purgeAndDisable, boolean overwriteIfExists, boolean realExecute, boolean migrateSegment) - throws IOException, InterruptedException { - doAclCopy = copyAcl; - doOverwrite = overwriteIfExists; - doMigrateSegment = migrateSegment; - srcConfig = srcCfg; - srcStore = ResourceStore.getStore(srcConfig); - dstConfig = dstCfg; - dstStore = ResourceStore.getStore(dstConfig); - dstProject = projectName; - conf = HadoopUtil.getCurrentConfiguration(); + private void moveAllProjectAndCubes() throws Exception { + ProjectManager srcProjectManager = ProjectManager.getInstance(srcConfig); + List<ProjectInstance> projects = srcProjectManager.listAllProjects(); + for (ProjectInstance project : projects) { + moveAllCubesUnderProject(project.getName(), srcProjectManager, ProjectManager.getInstance(dstConfig)); + } + } - CubeManager cubeManager = CubeManager.getInstance(srcConfig); + private void moveSingleCube(String cubeName, String dstProject, CubeManager cubeManager) throws IOException, InterruptedException { CubeInstance cube = cubeManager.getCube(cubeName); - srcHdfsWorkDir = srcConfig.getHdfsWorkingDirectory(cube.getProject()); + + //if -dstProject option is not set, copy cube to its origin project name + if (StringUtils.isEmpty(dstProject)) { + dstProject = cube.getProject(); + } + + ProjectManager dstProjectManager = ProjectManager.getInstance(dstConfig); + ProjectInstance instance = dstProjectManager.getProject(dstProject); + + if (null == instance) { + ProjectManager scrProjectManager = ProjectManager.getInstance(srcConfig); + ProjectInstance originProject = scrProjectManager.getProject(cube.getProject()); + //create the same dstProject from cube's original project information + dstProjectManager.createProject(dstProject, originProject.getOwner(), + originProject.getDescription(), originProject.getOverrideKylinProps()); + } + + if (null == cube) { + logger.warn("source cube: {} not exists", cubeName); + return; + } + + srcHdfsWorkDir = srcConfig.getHdfsWorkingDirectory(dstProject); dstHdfsWorkDir = dstConfig.getHdfsWorkingDirectory(dstProject); logger.info("cube to be moved is : " + cubeName); - if (migrateSegment) { + if (doMigrateSegment) { checkCubeState(cube); + KylinVersion srcVersion = new KylinVersion(cube.getVersion()); + if (srcVersion.major != KylinVersion.getCurrentVersion().major) { + throw new IllegalArgumentException(String.format(Locale.ROOT, + "can not migrate segment data from version: %s to version: %s", + srcVersion.toString(), KylinVersion.getCurrentVersion().toString())); + } } checkAndGetMetadataUrl(); hdfsFs = HadoopUtil.getWorkingFileSystem(); operations = new ArrayList<Opt>(); - copyFilesInMetaStore(cube); - if (!migrateSegment) { + copyFilesInMetaStore(cube, dstProject); + if (!doMigrateSegment) { clearSegments(cubeName); // this should be after copyFilesInMetaStore } - addCubeAndModelIntoProject(cube, cubeName); + addCubeAndModelIntoProject(cube, cubeName, dstProject); - if (migrateSegment && purgeAndDisable) { + if (doMigrateSegment && purgeAndDisable) { purgeAndDisable(cubeName); // this should be the last action } if (realExecute) { doOpts(); - updateMeta(dstConfig, projectName, cubeName, cube.getModel()); + updateMeta(dstConfig, dstProject, cubeName, cube.getModel()); } else { showOpts(); } @@ -211,10 +277,10 @@ public class CubeMigrationCLI extends AbstractApplication { } protected void clearSegments(String cubeName) throws IOException { - operations.add(new Opt(OptType.CLEAR_SEGMENTS, new Object[]{cubeName})); + operations.add(new Opt(OptType.CLEAR_SEGMENTS, new Object[]{cubeName}, null)); } - protected void copyFilesInMetaStore(CubeInstance cube) throws IOException { + protected void copyFilesInMetaStore(CubeInstance cube, String dstProject) throws IOException { if (dstStore.exists(cube.getResourcePath()) && !doOverwrite) throw new IllegalStateException("The cube named " + cube.getName() @@ -227,30 +293,30 @@ public class CubeMigrationCLI extends AbstractApplication { listCubeRelatedResources(cube, metaItems, dictAndSnapshot, srcParquetFiles, dstParquetFiles); for (String item : metaItems) { - operations.add(new Opt(OptType.COPY_FILE_IN_META, new Object[]{item})); + operations.add(new Opt(OptType.COPY_FILE_IN_META, new Object[]{item}, dstProject)); } if (doMigrateSegment) { for (String item : dictAndSnapshot) { - operations.add(new Opt(OptType.COPY_DICT_OR_SNAPSHOT, new Object[]{item, cube.getName()})); + operations.add(new Opt(OptType.COPY_DICT_OR_SNAPSHOT, new Object[]{item, cube.getName()}, dstProject)); } for (int i = 0; i < srcParquetFiles.size(); i++) { - operations.add(new Opt(OptType.COPY_PARQUET_FILE, new Object[]{srcParquetFiles.get(i), dstParquetFiles.get(i)})); + operations.add(new Opt(OptType.COPY_PARQUET_FILE, new Object[]{srcParquetFiles.get(i), dstParquetFiles.get(i)}, dstProject)); } } } - protected void addCubeAndModelIntoProject(CubeInstance srcCube, String cubeName) throws IOException { + protected void addCubeAndModelIntoProject(CubeInstance srcCube, String cubeName, String dstProject) throws IOException { String projectResPath = ProjectInstance.concatResourcePath(dstProject); if (!dstStore.exists(projectResPath)) throw new IllegalStateException("The target project " + dstProject + " does not exist"); - operations.add(new Opt(OptType.ADD_INTO_PROJECT, new Object[]{srcCube, cubeName, dstProject})); + operations.add(new Opt(OptType.ADD_INTO_PROJECT, new Object[]{srcCube, cubeName}, dstProject)); } private void purgeAndDisable(String cubeName) throws IOException { - operations.add(new Opt(OptType.PURGE_AND_DISABLE, new Object[]{cubeName})); + operations.add(new Opt(OptType.PURGE_AND_DISABLE, new Object[]{cubeName}, null)); } private List<String> getCompatibleTablePath(Set<TableRef> tableRefs, String project, String rootPath) @@ -324,6 +390,22 @@ public class CubeMigrationCLI extends AbstractApplication { @Override protected Options getOptions() { Options options = new Options(); + options.addOption(OPTION_SRC_CONFIG); + options.addOption(OPTION_DST_CONFIG); + + OptionGroup srcGroup = new OptionGroup(); + srcGroup.addOption(OPTION_ALL_CUBES); + srcGroup.addOption(OPTION_CUBE); + srcGroup.addOption(OPTION_SRC_PROJECT); + srcGroup.setRequired(true); + options.addOptionGroup(srcGroup); + + options.addOption(OPTION_DST_PROJECT); + options.addOption(OPTION_OVERWRITE); + options.addOption(OPTION_COPY_ACL); + options.addOption(OPTION_PURGE_AND_DISABLE); + options.addOption(OPTION_EXECUTE); + options.addOption(OPTION_MIGRATE_SEGMENTS); return options; } @@ -335,17 +417,19 @@ public class CubeMigrationCLI extends AbstractApplication { COPY_FILE_IN_META, COPY_DICT_OR_SNAPSHOT, COPY_PARQUET_FILE, ADD_INTO_PROJECT, PURGE_AND_DISABLE, CLEAR_SEGMENTS } - protected void addOpt(OptType type, Object[] params) { - operations.add(new Opt(type, params)); + protected void addOpt(OptType type, Object[] params, String dstProject) { + operations.add(new Opt(type, params, dstProject)); } private class Opt { private OptType type; private Object[] params; + private String dstProject; - private Opt(OptType type, Object[] params) { + private Opt(OptType type, Object[] params, String dstProject) { this.type = type; this.params = params; + this.dstProject = dstProject; } public String toString() { @@ -353,6 +437,7 @@ public class CubeMigrationCLI extends AbstractApplication { sb.append(type).append(":"); for (Object s : params) sb.append(s).append(", "); + sb.append(dstProject); return sb.toString(); } @@ -407,15 +492,50 @@ public class CubeMigrationCLI extends AbstractApplication { // dataModel's project maybe be different with new project. if (item.startsWith(ResourceStore.DATA_MODEL_DESC_RESOURCE_ROOT)) { DataModelDesc dataModelDesc = srcStore.getResource(item, DataModelManager.getInstance(srcConfig).getDataModelSerializer()); - if (dataModelDesc != null && dataModelDesc.getProjectName() != null && !dataModelDesc.getProjectName().equals(dstProject)) { - dataModelDesc.setProjectName(dstProject); + //before copy a dataModel, check its uniqueness in new cluster. Because + //a model belonging to several projects is not allowed to exist, it will cause unexpected problem. + List<ProjectInstance> projectContainsModel = ProjectManager.getInstance(dstConfig).findProjectsByModel(dataModelDesc.getName()); + if (projectContainsModel.size() > 1) { + throw new RuntimeException(String.format(Locale.ROOT, "model: %s belongs to several projects: %s", + dataModelDesc.getName(), Arrays.toString(projectContainsModel.toArray()))); + } + if (projectContainsModel.size() == 1 && !opt.dstProject.equals(projectContainsModel.get(0).getName())) { + throw new IllegalArgumentException(String.format(Locale.ROOT, + "there already exists model: %s in project: %s, can't create model with the same name in dest project: %s", + dataModelDesc.getName(), projectContainsModel.get(0).getName(), opt.dstProject)); + } + + if (dataModelDesc != null && dataModelDesc.getProjectName() != null && !dataModelDesc.getProjectName().equals(opt.dstProject)) { + dataModelDesc.setProjectName(opt.dstProject); dstStore.putResource(item, dataModelDesc, res.lastModified(), DataModelManager.getInstance(srcConfig).getDataModelSerializer()); logger.info("Item " + item + " is copied."); + res.close(); break; } } + if (item.startsWith(ResourceStore.CUBE_DESC_RESOURCE_ROOT)) { + JsonSerializer serializer = new JsonSerializer(CubeDesc.class, false); + ContentReader<CubeDesc> reader = new ContentReader<>(serializer); + CubeDesc cubeDesc = reader.readContent(res); + + //set storage type to parquet and job engine to spark_ii + cubeDesc.setStorageType(IStorageAware.ID_PARQUET); + cubeDesc.setEngineType(IEngineAware.ID_SPARK_II); + cubeDesc.setVersion(KylinVersion.getCurrentVersion().toString()); + //once storage type have changed, signature will change too + cubeDesc.setSignature(cubeDesc.calculateSignature()); + + ByteArrayOutputStream buf = new ByteArrayOutputStream(); + DataOutputStream dout = new DataOutputStream(buf); + serializer.serialize(cubeDesc, dout); + dstStore.putResource(item, new ByteArrayInputStream(buf.toByteArray()), res.lastModified()); + dout.close(); + buf.close(); + res.close(); + break; + } dstStore.putResource(renameTableWithinProject(item), res.content(), res.lastModified()); - res.content().close(); + res.close(); logger.info("Item " + item + " is copied"); break; } @@ -446,7 +566,7 @@ public class CubeMigrationCLI extends AbstractApplication { case ADD_INTO_PROJECT: { CubeInstance srcCube = (CubeInstance) opt.params[0]; String cubeName = (String) opt.params[1]; - String projectName = (String) opt.params[2]; + String projectName = opt.dstProject; String modelName = srcCube.getDescriptor().getModelName(); String projectResPath = ProjectInstance.concatResourcePath(projectName);