This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch kylin-on-parquet-v2 in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/kylin-on-parquet-v2 by this push: new cd449ea KYLIN-4817 Refine CubeMigrationCLI for kylin4 cd449ea is described below commit cd449eab1c6f49a27bc97ecfff8e1b29af92aead Author: yaqian.zhang <598593...@qq.com> AuthorDate: Mon Nov 30 15:36:43 2020 +0800 KYLIN-4817 Refine CubeMigrationCLI for kylin4 --- .../apache/kylin/common/restclient/RestClient.java | 4 +- .../engine/spark/metadata/cube/PathManager.java | 6 + .../org/apache/kylin/tool/CubeMigrationCLI.java | 439 ++++++++------------- 3 files changed, 168 insertions(+), 281 deletions(-) diff --git a/core-common/src/main/java/org/apache/kylin/common/restclient/RestClient.java b/core-common/src/main/java/org/apache/kylin/common/restclient/RestClient.java index 955b0ff..2e99809 100644 --- a/core-common/src/main/java/org/apache/kylin/common/restclient/RestClient.java +++ b/core-common/src/main/java/org/apache/kylin/common/restclient/RestClient.java @@ -202,7 +202,7 @@ public class RestClient { public String getKylinProperties() throws IOException { String url = baseUrl + "/admin/config"; - HttpGet request = new HttpGet(url); + HttpGet request = newGet(url); HttpResponse response = null; try { response = client.execute(request); @@ -380,7 +380,7 @@ public class RestClient { } private HttpGet newGet(String url) { - HttpGet get = new HttpGet(); + HttpGet get = new HttpGet(url); addHttpHeaders(get); return get; } diff --git a/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/metadata/cube/PathManager.java b/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/metadata/cube/PathManager.java index 0484bfc..6444715 100644 --- a/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/metadata/cube/PathManager.java +++ b/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/metadata/cube/PathManager.java @@ -49,6 +49,12 @@ public final class PathManager { return hdfsWorkDir + "parquet" + File.separator + cube.getName() + File.separator + segName + "_" + identifier; } + public static String getSegmentParquetStoragePath(String hdfsWorkDir, String cubeName, CubeSegment segment) { + String segmentName = segment.getName(); + String identifier = segment.getStorageLocationIdentifier(); + return hdfsWorkDir + "parquet" + File.separator + cubeName + File.separator + segmentName + "_" + identifier; + } + /** * Delete segment path */ diff --git a/tool/src/main/java/org/apache/kylin/tool/CubeMigrationCLI.java b/tool/src/main/java/org/apache/kylin/tool/CubeMigrationCLI.java index 4612cef..550da0c 100644 --- a/tool/src/main/java/org/apache/kylin/tool/CubeMigrationCLI.java +++ b/tool/src/main/java/org/apache/kylin/tool/CubeMigrationCLI.java @@ -18,22 +18,20 @@ package org.apache.kylin.tool; +import java.io.File; import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; import java.util.List; -import java.util.Locale; import java.util.Map; import java.util.Set; import org.apache.commons.cli.Options; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.StorageURL; import org.apache.kylin.common.persistence.JsonSerializer; @@ -42,30 +40,23 @@ import org.apache.kylin.common.persistence.ResourceStore; import org.apache.kylin.common.persistence.Serializer; import org.apache.kylin.common.restclient.RestClient; import org.apache.kylin.common.util.AbstractApplication; -import org.apache.kylin.common.util.Dictionary; import org.apache.kylin.common.util.HadoopUtil; import org.apache.kylin.common.util.OptionsHelper; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.cube.model.CubeDesc; -import org.apache.kylin.dict.DictionaryInfo; -import org.apache.kylin.dict.DictionaryManager; -import org.apache.kylin.dict.lookup.SnapshotManager; -import org.apache.kylin.dict.lookup.SnapshotTable; -import org.apache.kylin.engine.mr.JobBuilderSupport; +import org.apache.kylin.cube.model.DictionaryDesc; +import org.apache.kylin.engine.spark.metadata.cube.PathManager; import org.apache.kylin.metadata.MetadataConstants; import org.apache.kylin.metadata.model.DataModelDesc; -import org.apache.kylin.metadata.model.IStorageAware; import org.apache.kylin.metadata.model.SegmentStatusEnum; import org.apache.kylin.metadata.model.TableDesc; -import org.apache.kylin.metadata.model.TableExtDesc; import org.apache.kylin.metadata.model.TableRef; +import org.apache.kylin.metadata.model.TableExtDesc; import org.apache.kylin.metadata.project.ProjectInstance; -import org.apache.kylin.metadata.realization.IRealizationConstants; import org.apache.kylin.metadata.realization.RealizationStatusEnum; import org.apache.kylin.metadata.realization.RealizationType; -import org.apache.kylin.storage.hbase.HBaseConnection; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -86,14 +77,17 @@ public class CubeMigrationCLI extends AbstractApplication { protected KylinConfig dstConfig; protected ResourceStore srcStore; protected ResourceStore dstStore; - protected FileSystem hdfsFS; - private HBaseAdmin hbaseAdmin; + protected FileSystem hdfsFs; + protected Configuration conf; protected boolean doAclCopy = false; protected boolean doOverwrite = false; protected boolean doMigrateSegment = true; protected String dstProject; + protected String srcHdfsWorkDir; + protected String dstHdfsWorkDir; private static final String ACL_PREFIX = "/acl/"; + private static final String GLOBAL_DICT_PREFIX = "/dict/global_dict/"; public static void main(String[] args) throws IOException, InterruptedException { @@ -125,7 +119,7 @@ public class CubeMigrationCLI extends AbstractApplication { } public void moveCube(String srcCfgUri, String dstCfgUri, String cubeName, String projectName, String copyAcl, - String purgeAndDisable, String overwriteIfExists, String realExecute) + String purgeAndDisable, String overwriteIfExists, String realExecute) throws IOException, InterruptedException { moveCube(KylinConfig.createInstanceFromUri(srcCfgUri), KylinConfig.createInstanceFromUri(dstCfgUri), cubeName, @@ -133,7 +127,7 @@ public class CubeMigrationCLI extends AbstractApplication { } public void moveCube(KylinConfig srcCfg, KylinConfig dstCfg, String cubeName, String projectName, String copyAcl, - String purgeAndDisable, String overwriteIfExists, String realExecute) + String purgeAndDisable, String overwriteIfExists, String realExecute) throws IOException, InterruptedException { moveCube(srcCfg, dstCfg, cubeName, projectName, Boolean.parseBoolean(copyAcl), @@ -142,7 +136,7 @@ public class CubeMigrationCLI extends AbstractApplication { } public void moveCube(String srcCfgUri, String dstCfgUri, String cubeName, String projectName, String copyAcl, - String purgeAndDisable, String overwriteIfExists, String realExecute, String migrateSegment) + String purgeAndDisable, String overwriteIfExists, String realExecute, String migrateSegment) throws IOException, InterruptedException { moveCube(KylinConfig.createInstanceFromUri(srcCfgUri), KylinConfig.createInstanceFromUri(dstCfgUri), cubeName, @@ -152,7 +146,7 @@ public class CubeMigrationCLI extends AbstractApplication { } public void moveCube(KylinConfig srcCfg, KylinConfig dstCfg, String cubeName, String projectName, boolean copyAcl, - boolean purgeAndDisable, boolean overwriteIfExists, boolean realExecute, boolean migrateSegment) + boolean purgeAndDisable, boolean overwriteIfExists, boolean realExecute, boolean migrateSegment) throws IOException, InterruptedException { doAclCopy = copyAcl; doOverwrite = overwriteIfExists; @@ -162,26 +156,24 @@ public class CubeMigrationCLI extends AbstractApplication { dstConfig = dstCfg; dstStore = ResourceStore.getStore(dstConfig); dstProject = projectName; + conf = HadoopUtil.getCurrentConfiguration(); CubeManager cubeManager = CubeManager.getInstance(srcConfig); CubeInstance cube = cubeManager.getCube(cubeName); + srcHdfsWorkDir = srcConfig.getHdfsWorkingDirectory(cube.getProject()); + dstHdfsWorkDir = dstConfig.getHdfsWorkingDirectory(dstProject); logger.info("cube to be moved is : " + cubeName); if (migrateSegment) { checkCubeState(cube); } - checkAndGetHbaseUrl(); + checkAndGetMetadataUrl(); - Configuration conf = HBaseConnection.getCurrentHBaseConfiguration(); - hbaseAdmin = new HBaseAdmin(conf); - hdfsFS = HadoopUtil.getWorkingFileSystem(); + hdfsFs = HadoopUtil.getWorkingFileSystem(); operations = new ArrayList<Opt>(); copyFilesInMetaStore(cube); - if (migrateSegment) { - renameFoldersInHdfs(cube); - changeHtableHost(cube); - } else { + if (!migrateSegment) { clearSegments(cubeName); // this should be after copyFilesInMetaStore } addCubeAndModelIntoProject(cube, cubeName); @@ -192,20 +184,12 @@ public class CubeMigrationCLI extends AbstractApplication { if (realExecute) { doOpts(); - if (migrateSegment) { - checkMigrationSuccess(dstConfig, cubeName, true); - } updateMeta(dstConfig, projectName, cubeName, cube.getModel()); } else { showOpts(); } } - public void checkMigrationSuccess(KylinConfig kylinConfig, String cubeName, Boolean ifFix) throws IOException { - CubeMigrationCheckCLI checkCLI = new CubeMigrationCheckCLI(kylinConfig, ifFix); - checkCLI.execute(cubeName); - } - protected void checkCubeState(CubeInstance cube) { if (cube.getStatus() != RealizationStatusEnum.READY) throw new IllegalStateException("Cannot migrate cube that is not in READY state."); @@ -217,40 +201,16 @@ public class CubeMigrationCLI extends AbstractApplication { } } - protected void checkAndGetHbaseUrl() { + protected void checkAndGetMetadataUrl() { StorageURL srcMetadataUrl = srcConfig.getMetadataUrl(); StorageURL dstMetadataUrl = dstConfig.getMetadataUrl(); logger.info("src metadata url is " + srcMetadataUrl); logger.info("dst metadata url is " + dstMetadataUrl); - - if (!"hbase".equals(srcMetadataUrl.getScheme()) || !"hbase".equals(dstMetadataUrl.getScheme())) - throw new IllegalStateException("Both metadata urls should be hbase metadata url"); - } - - protected void renameFoldersInHdfs(CubeInstance cube) throws IOException { - for (CubeSegment segment : cube.getSegments()) { - - String jobUuid = segment.getLastBuildJobID(); - String src = JobBuilderSupport.getJobWorkingDir(srcConfig.getHdfsWorkingDirectory(), jobUuid); - String tgt = JobBuilderSupport.getJobWorkingDir(dstConfig.getHdfsWorkingDirectory(), jobUuid); - - operations.add(new Opt(OptType.RENAME_FOLDER_IN_HDFS, new Object[] { src, tgt })); - } - - } - - protected void changeHtableHost(CubeInstance cube) { - if (cube.getDescriptor().getStorageType() != IStorageAware.ID_SHARDED_HBASE) - return; - for (CubeSegment segment : cube.getSegments()) { - operations - .add(new Opt(OptType.CHANGE_HTABLE_HOST, new Object[] { segment.getStorageLocationIdentifier() })); - } } protected void clearSegments(String cubeName) throws IOException { - operations.add(new Opt(OptType.CLEAR_SEGMENTS, new Object[] { cubeName })); + operations.add(new Opt(OptType.CLEAR_SEGMENTS, new Object[]{cubeName})); } protected void copyFilesInMetaStore(CubeInstance cube) throws IOException { @@ -260,16 +220,22 @@ public class CubeMigrationCLI extends AbstractApplication { + " already exists on target metadata store. Use overwriteIfExists to overwrite it"); List<String> metaItems = new ArrayList<String>(); + List<String> srcParquetFiles = new ArrayList<String>(); + List<String> dstParquetFiles = new ArrayList<String>(); Set<String> dictAndSnapshot = new HashSet<String>(); - listCubeRelatedResources(cube, metaItems, dictAndSnapshot); + listCubeRelatedResources(cube, metaItems, dictAndSnapshot, srcParquetFiles, dstParquetFiles); for (String item : metaItems) { - operations.add(new Opt(OptType.COPY_FILE_IN_META, new Object[] { item })); + operations.add(new Opt(OptType.COPY_FILE_IN_META, new Object[]{item})); } if (doMigrateSegment) { for (String item : dictAndSnapshot) { - operations.add(new Opt(OptType.COPY_DICT_OR_SNAPSHOT, new Object[] { item, cube.getName() })); + operations.add(new Opt(OptType.COPY_DICT_OR_SNAPSHOT, new Object[]{item, cube.getName()})); + } + + for (int i = 0; i < srcParquetFiles.size(); i++) { + operations.add(new Opt(OptType.COPY_PARQUET_FILE, new Object[]{srcParquetFiles.get(i), dstParquetFiles.get(i)})); } } } @@ -279,11 +245,11 @@ public class CubeMigrationCLI extends AbstractApplication { if (!dstStore.exists(projectResPath)) throw new IllegalStateException("The target project " + dstProject + " does not exist"); - operations.add(new Opt(OptType.ADD_INTO_PROJECT, new Object[] { srcCube, cubeName, dstProject })); + operations.add(new Opt(OptType.ADD_INTO_PROJECT, new Object[]{srcCube, cubeName, dstProject})); } private void purgeAndDisable(String cubeName) throws IOException { - operations.add(new Opt(OptType.PURGE_AND_DISABLE, new Object[] { cubeName })); + operations.add(new Opt(OptType.PURGE_AND_DISABLE, new Object[]{cubeName})); } private List<String> getCompatibleTablePath(Set<TableRef> tableRefs, String project, String rootPath) @@ -311,7 +277,7 @@ public class CubeMigrationCLI extends AbstractApplication { return toResource; } - protected void listCubeRelatedResources(CubeInstance cube, List<String> metaResource, Set<String> dictAndSnapshot) + protected void listCubeRelatedResources(CubeInstance cube, List<String> metaResource, Set<String> dictAndSnapshot, List<String> srcParquetFiles, List<String> dstParquetFiles) throws IOException { CubeDesc cubeDesc = cube.getDescriptor(); @@ -326,10 +292,25 @@ public class CubeMigrationCLI extends AbstractApplication { metaResource.addAll(getCompatibleTablePath(tblRefs, prj, ResourceStore.TABLE_EXD_RESOURCE_ROOT)); if (doMigrateSegment) { + for (DictionaryDesc dictionaryDesc : cubeDesc.getDictionaries()) { + String[] columnInfo = dictionaryDesc.getColumnRef().getColumnWithTable().split("\\."); + String globalDictPath; + if (columnInfo.length == 3) { + globalDictPath = cube.getProject() + GLOBAL_DICT_PREFIX + columnInfo[1] + File.separator + columnInfo[2]; + } else { + globalDictPath = cube.getProject() + GLOBAL_DICT_PREFIX + columnInfo[0] + File.separator + columnInfo[1]; + } + if (globalDictPath != null) { + logger.info("Add " + globalDictPath + " to migrate dict list"); + dictAndSnapshot.add(globalDictPath); + } + } for (CubeSegment segment : cube.getSegments()) { metaResource.add(segment.getStatisticsResourcePath()); dictAndSnapshot.addAll(segment.getSnapshotPaths()); - dictAndSnapshot.addAll(segment.getDictionaryPaths()); + srcParquetFiles.add(PathManager.getSegmentParquetStoragePath(srcHdfsWorkDir, cube.getName(), segment)); + dstParquetFiles.add(PathManager.getSegmentParquetStoragePath(dstHdfsWorkDir, cube.getName(), segment)); + logger.info("Add " + PathManager.getSegmentParquetStoragePath(cube, segment.getName(), segment.getStorageLocationIdentifier()) + " to migrate parquet file list"); } } @@ -337,11 +318,6 @@ public class CubeMigrationCLI extends AbstractApplication { metaResource.add(ACL_PREFIX + cube.getUuid()); metaResource.add(ACL_PREFIX + cube.getModel().getUuid()); } - -// if (cubeDesc.isStreamingCube()) { -// // add streaming source config info for streaming cube -// metaResource.add(StreamingSourceConfig.concatResourcePath(cubeDesc.getModel().getRootFactTableName())); -// } } @Override @@ -355,7 +331,7 @@ public class CubeMigrationCLI extends AbstractApplication { } protected enum OptType { - COPY_FILE_IN_META, COPY_DICT_OR_SNAPSHOT, RENAME_FOLDER_IN_HDFS, ADD_INTO_PROJECT, CHANGE_HTABLE_HOST, PURGE_AND_DISABLE, CLEAR_SEGMENTS + COPY_FILE_IN_META, COPY_DICT_OR_SNAPSHOT, COPY_PARQUET_FILE, ADD_INTO_PROJECT, PURGE_AND_DISABLE, CLEAR_SEGMENTS } protected void addOpt(OptType type, Object[] params) { @@ -420,161 +396,94 @@ public class CubeMigrationCLI extends AbstractApplication { logger.info("Executing operation: " + opt.toString()); switch (opt.type) { - case CHANGE_HTABLE_HOST: { - String tableName = (String) opt.params[0]; - System.out.println("CHANGE_HTABLE_HOST, table name: " + tableName); - HTableDescriptor desc = hbaseAdmin.getTableDescriptor(TableName.valueOf(tableName)); - hbaseAdmin.disableTable(tableName); - desc.setValue(IRealizationConstants.HTableTag, dstConfig.getMetadataUrlPrefix()); - hbaseAdmin.modifyTable(tableName, desc); - hbaseAdmin.enableTable(tableName); - logger.info("CHANGE_HTABLE_HOST is completed"); - break; - } - case COPY_FILE_IN_META: { - String item = (String) opt.params[0]; - RawResource res = srcStore.getResource(item); - if (res == null) { - logger.info("Item: {} doesn't exist, ignore it.", item); + case COPY_FILE_IN_META: { + String item = (String) opt.params[0]; + RawResource res = srcStore.getResource(item); + if (res == null) { + logger.info("Item: {} doesn't exist, ignore it.", item); + break; + } + dstStore.putResource(renameTableWithinProject(item), res.content(), res.lastModified()); + res.content().close(); + logger.info("Item " + item + " is copied"); break; } - dstStore.putResource(renameTableWithinProject(item), res.content(), res.lastModified()); - res.content().close(); - logger.info("Item " + item + " is copied"); - break; - } - case COPY_DICT_OR_SNAPSHOT: { - String item = (String) opt.params[0]; - - if (item.toLowerCase(Locale.ROOT).endsWith(".dict")) { - DictionaryManager dstDictMgr = DictionaryManager.getInstance(dstConfig); - DictionaryManager srcDicMgr = DictionaryManager.getInstance(srcConfig); - DictionaryInfo dictSrc = srcDicMgr.getDictionaryInfo(item); - - long ts = dictSrc.getLastModified(); - dictSrc.setLastModified(0);//to avoid resource store write conflict - Dictionary dictObj = dictSrc.getDictionaryObject().copyToAnotherMeta(srcConfig, dstConfig); - DictionaryInfo dictSaved = dstDictMgr.trySaveNewDict(dictObj, dictSrc); - dictSrc.setLastModified(ts); - - if (dictSaved == dictSrc) { - //no dup found, already saved to dest - logger.info("Item " + item + " is copied"); + case COPY_DICT_OR_SNAPSHOT: { + String item = (String) opt.params[0]; + String itemPath = item.substring(item.substring(0, item.indexOf("/")).length()+1); + Path srcPath = new Path(srcHdfsWorkDir + itemPath); + Path dstPath = new Path(dstHdfsWorkDir + itemPath); + if (hdfsFs.exists(srcPath)) { + FileUtil.copy(hdfsFs, srcPath, hdfsFs, dstPath, false, true, conf); + logger.info("Copy " + srcPath + " to " + dstPath); } else { - //dictSrc is rejected because of duplication - //modify cube's dictionary path - String cubeName = (String) opt.params[1]; - String cubeResPath = CubeInstance.concatResourcePath(cubeName); - Serializer<CubeInstance> cubeSerializer = new JsonSerializer<CubeInstance>(CubeInstance.class); - CubeInstance cube = dstStore.getResource(cubeResPath, cubeSerializer); - for (CubeSegment segment : cube.getSegments()) { - for (Map.Entry<String, String> entry : segment.getDictionaries().entrySet()) { - if (entry.getValue().equalsIgnoreCase(item)) { - entry.setValue(dictSaved.getResourcePath()); - } - } - } - dstStore.checkAndPutResource(cubeResPath, cube, cubeSerializer); - logger.info("Item " + item + " is dup, instead " + dictSaved.getResourcePath() + " is reused"); + logger.info("Dict or snapshot " + srcPath + " is not exists, ignore it"); } - - } else if (item.toLowerCase(Locale.ROOT).endsWith(".snapshot")) { - SnapshotManager dstSnapMgr = SnapshotManager.getInstance(dstConfig); - SnapshotManager srcSnapMgr = SnapshotManager.getInstance(srcConfig); - SnapshotTable snapSrc = srcSnapMgr.getSnapshotTable(item); - - long ts = snapSrc.getLastModified(); - snapSrc.setLastModified(0); - SnapshotTable snapSaved = dstSnapMgr.trySaveNewSnapshot(snapSrc); - snapSrc.setLastModified(ts); - - if (snapSaved == snapSrc) { - //no dup found, already saved to dest - logger.info("Item " + item + " is copied"); - + break; + } + case COPY_PARQUET_FILE: { + Path srcPath = new Path((String) opt.params[0]); + Path dstPath = new Path((String) opt.params[1]); + if (hdfsFs.exists(srcPath)) { + FileUtil.copy(hdfsFs, srcPath, hdfsFs, dstPath, false, true, conf); + logger.info("Copy " + srcPath + " to " + dstPath); } else { - String cubeName = (String) opt.params[1]; - String cubeResPath = CubeInstance.concatResourcePath(cubeName); - Serializer<CubeInstance> cubeSerializer = new JsonSerializer<CubeInstance>(CubeInstance.class); - CubeInstance cube = dstStore.getResource(cubeResPath, cubeSerializer); - for (CubeSegment segment : cube.getSegments()) { - for (Map.Entry<String, String> entry : segment.getSnapshots().entrySet()) { - if (entry.getValue().equalsIgnoreCase(item)) { - entry.setValue(snapSaved.getResourcePath()); - } - } - } - dstStore.checkAndPutResource(cubeResPath, cube, cubeSerializer); - logger.info("Item " + item + " is dup, instead " + snapSaved.getResourcePath() + " is reused"); - + logger.info("Parquet file " + srcPath + " is not exists, ignore it"); } - - } else { - logger.error("unknown item found: " + item); - logger.info("ignore it"); + break; } + case ADD_INTO_PROJECT: { + CubeInstance srcCube = (CubeInstance) opt.params[0]; + String cubeName = (String) opt.params[1]; + String projectName = (String) opt.params[2]; + String modelName = srcCube.getDescriptor().getModelName(); + + String projectResPath = ProjectInstance.concatResourcePath(projectName); + Serializer<ProjectInstance> projectSerializer = new JsonSerializer<ProjectInstance>(ProjectInstance.class); + ProjectInstance project = dstStore.getResource(projectResPath, projectSerializer); + + for (TableRef tableRef : srcCube.getModel().getAllTables()) { + project.addTable(tableRef.getTableIdentity()); + } - break; - } - case RENAME_FOLDER_IN_HDFS: { - String srcPath = (String) opt.params[0]; - String dstPath = (String) opt.params[1]; - renameHDFSPath(srcPath, dstPath); - logger.info("HDFS Folder renamed from " + srcPath + " to " + dstPath); - break; - } - case ADD_INTO_PROJECT: { - CubeInstance srcCube = (CubeInstance) opt.params[0]; - String cubeName = (String) opt.params[1]; - String projectName = (String) opt.params[2]; - String modelName = srcCube.getDescriptor().getModelName(); - - String projectResPath = ProjectInstance.concatResourcePath(projectName); - Serializer<ProjectInstance> projectSerializer = new JsonSerializer<ProjectInstance>(ProjectInstance.class); - ProjectInstance project = dstStore.getResource(projectResPath, projectSerializer); - - for (TableRef tableRef : srcCube.getModel().getAllTables()) { - project.addTable(tableRef.getTableIdentity()); - } + if (!project.getModels().contains(modelName)) + project.addModel(modelName); + project.removeRealization(RealizationType.CUBE, cubeName); + project.addRealizationEntry(RealizationType.CUBE, cubeName); - if (!project.getModels().contains(modelName)) - project.addModel(modelName); - project.removeRealization(RealizationType.CUBE, cubeName); - project.addRealizationEntry(RealizationType.CUBE, cubeName); + dstStore.checkAndPutResource(projectResPath, project, projectSerializer); + logger.info("Project instance for " + projectName + " is corrected"); + break; + } + case CLEAR_SEGMENTS: { + String cubeName = (String) opt.params[0]; + String cubeInstancePath = CubeInstance.concatResourcePath(cubeName); + Serializer<CubeInstance> cubeInstanceSerializer = new JsonSerializer<CubeInstance>(CubeInstance.class); + CubeInstance cubeInstance = dstStore.getResource(cubeInstancePath, cubeInstanceSerializer); + cubeInstance.getSegments().clear(); + cubeInstance.clearCuboids(); + cubeInstance.setCreateTimeUTC(System.currentTimeMillis()); + cubeInstance.setStatus(RealizationStatusEnum.DISABLED); + dstStore.checkAndPutResource(cubeInstancePath, cubeInstance, cubeInstanceSerializer); + logger.info("Cleared segments for " + cubeName + ", since segments has not been copied"); + break; + } + case PURGE_AND_DISABLE: { + String cubeName = (String) opt.params[0]; + String cubeResPath = CubeInstance.concatResourcePath(cubeName); + Serializer<CubeInstance> cubeSerializer = new JsonSerializer<CubeInstance>(CubeInstance.class); + CubeInstance cube = srcStore.getResource(cubeResPath, cubeSerializer); + cube.getSegments().clear(); + cube.setStatus(RealizationStatusEnum.DISABLED); + srcStore.checkAndPutResource(cubeResPath, cube, cubeSerializer); + logger.info("Cube " + cubeName + " is purged and disabled in " + srcConfig.getMetadataUrl()); - dstStore.checkAndPutResource(projectResPath, project, projectSerializer); - logger.info("Project instance for " + projectName + " is corrected"); - break; - } - case CLEAR_SEGMENTS: { - String cubeName = (String) opt.params[0]; - String cubeInstancePath = CubeInstance.concatResourcePath(cubeName); - Serializer<CubeInstance> cubeInstanceSerializer = new JsonSerializer<CubeInstance>(CubeInstance.class); - CubeInstance cubeInstance = dstStore.getResource(cubeInstancePath, cubeInstanceSerializer); - cubeInstance.getSegments().clear(); - cubeInstance.clearCuboids(); - cubeInstance.setCreateTimeUTC(System.currentTimeMillis()); - cubeInstance.setStatus(RealizationStatusEnum.DISABLED); - dstStore.checkAndPutResource(cubeInstancePath, cubeInstance, cubeInstanceSerializer); - logger.info("Cleared segments for " + cubeName + ", since segments has not been copied"); - break; - } - case PURGE_AND_DISABLE: { - String cubeName = (String) opt.params[0]; - String cubeResPath = CubeInstance.concatResourcePath(cubeName); - Serializer<CubeInstance> cubeSerializer = new JsonSerializer<CubeInstance>(CubeInstance.class); - CubeInstance cube = srcStore.getResource(cubeResPath, cubeSerializer); - cube.getSegments().clear(); - cube.setStatus(RealizationStatusEnum.DISABLED); - srcStore.checkAndPutResource(cubeResPath, cube, cubeSerializer); - logger.info("Cube " + cubeName + " is purged and disabled in " + srcConfig.getMetadataUrl()); - - break; - } - default: { - //do nothing - break; - } + break; + } + default: { + //do nothing + break; + } } } @@ -582,53 +491,38 @@ public class CubeMigrationCLI extends AbstractApplication { logger.info("Undo operation: " + opt.toString()); switch (opt.type) { - case CHANGE_HTABLE_HOST: { - String tableName = (String) opt.params[0]; - HTableDescriptor desc = hbaseAdmin.getTableDescriptor(TableName.valueOf(tableName)); - hbaseAdmin.disableTable(tableName); - desc.setValue(IRealizationConstants.HTableTag, srcConfig.getMetadataUrlPrefix()); - hbaseAdmin.modifyTable(tableName, desc); - hbaseAdmin.enableTable(tableName); - break; - } - case COPY_FILE_IN_META: { - // no harm - logger.info("Undo for COPY_FILE_IN_META is ignored"); - String item = (String) opt.params[0]; - - if (item.startsWith(ACL_PREFIX) && doAclCopy) { - logger.info("Remove acl record"); - dstStore.deleteResource(item); + case COPY_FILE_IN_META: { + // no harm + logger.info("Undo for COPY_FILE_IN_META is ignored"); + String item = (String) opt.params[0]; + + if (item.startsWith(ACL_PREFIX) && doAclCopy) { + logger.info("Remove acl record"); + dstStore.deleteResource(item); + } + break; } - break; - } - case COPY_DICT_OR_SNAPSHOT: { - // no harm - logger.info("Undo for COPY_DICT_OR_SNAPSHOT is ignored"); - break; - } - case RENAME_FOLDER_IN_HDFS: { - String srcPath = (String) opt.params[1]; - String dstPath = (String) opt.params[0]; - - if (hdfsFS.exists(new Path(srcPath)) && !hdfsFS.exists(new Path(dstPath))) { - renameHDFSPath(srcPath, dstPath); - logger.info("HDFS Folder renamed from " + srcPath + " to " + dstPath); + case COPY_DICT_OR_SNAPSHOT: { + // no harm + logger.info("Undo for COPY_DICT_OR_SNAPSHOT is ignored"); + break; + } + case ADD_INTO_PROJECT: { + logger.info("Undo for ADD_INTO_PROJECT is ignored"); + break; + } + case PURGE_AND_DISABLE: { + logger.info("Undo for PURGE_AND_DISABLE is not supported"); + break; + } + case COPY_PARQUET_FILE: { + logger.info("Undo for COPY_PARQUET_FILE is ignored"); + break; + } + default: { + //do nothing + break; } - break; - } - case ADD_INTO_PROJECT: { - logger.info("Undo for ADD_INTO_PROJECT is ignored"); - break; - } - case PURGE_AND_DISABLE: { - logger.info("Undo for PURGE_AND_DISABLE is not supported"); - break; - } - default: { - //do nothing - break; - } } } @@ -660,17 +554,4 @@ public class CubeMigrationCLI extends AbstractApplication { } } } - - private void renameHDFSPath(String srcPath, String dstPath) throws IOException, InterruptedException { - int nRetry = 0; - int sleepTime = 5000; - while (!hdfsFS.rename(new Path(srcPath), new Path(dstPath))) { - ++nRetry; - if (nRetry > 3) { - throw new InterruptedException("Cannot rename folder " + srcPath + " to folder " + dstPath); - } else { - Thread.sleep((long) sleepTime * nRetry * nRetry); - } - } - } -} +} \ No newline at end of file