This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin-on-parquet-v2
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/kylin-on-parquet-v2 by this 
push:
     new cd449ea  KYLIN-4817 Refine CubeMigrationCLI for kylin4
cd449ea is described below

commit cd449eab1c6f49a27bc97ecfff8e1b29af92aead
Author: yaqian.zhang <598593...@qq.com>
AuthorDate: Mon Nov 30 15:36:43 2020 +0800

    KYLIN-4817 Refine CubeMigrationCLI for kylin4
---
 .../apache/kylin/common/restclient/RestClient.java |   4 +-
 .../engine/spark/metadata/cube/PathManager.java    |   6 +
 .../org/apache/kylin/tool/CubeMigrationCLI.java    | 439 ++++++++-------------
 3 files changed, 168 insertions(+), 281 deletions(-)

diff --git 
a/core-common/src/main/java/org/apache/kylin/common/restclient/RestClient.java 
b/core-common/src/main/java/org/apache/kylin/common/restclient/RestClient.java
index 955b0ff..2e99809 100644
--- 
a/core-common/src/main/java/org/apache/kylin/common/restclient/RestClient.java
+++ 
b/core-common/src/main/java/org/apache/kylin/common/restclient/RestClient.java
@@ -202,7 +202,7 @@ public class RestClient {
 
     public String getKylinProperties() throws IOException {
         String url = baseUrl + "/admin/config";
-        HttpGet request = new HttpGet(url);
+        HttpGet request = newGet(url);
         HttpResponse response = null;
         try {
             response = client.execute(request);
@@ -380,7 +380,7 @@ public class RestClient {
     }
 
     private HttpGet newGet(String url) {
-        HttpGet get = new HttpGet();
+        HttpGet get = new HttpGet(url);
         addHttpHeaders(get);
         return get;
     }
diff --git 
a/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/metadata/cube/PathManager.java
 
b/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/metadata/cube/PathManager.java
index 0484bfc..6444715 100644
--- 
a/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/metadata/cube/PathManager.java
+++ 
b/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/metadata/cube/PathManager.java
@@ -49,6 +49,12 @@ public final class PathManager {
         return hdfsWorkDir + "parquet" + File.separator + cube.getName() + 
File.separator + segName + "_" + identifier;
     }
 
+    public static String getSegmentParquetStoragePath(String hdfsWorkDir, 
String cubeName, CubeSegment segment) {
+        String segmentName = segment.getName();
+        String identifier = segment.getStorageLocationIdentifier();
+        return hdfsWorkDir + "parquet" + File.separator + cubeName + 
File.separator + segmentName + "_" + identifier;
+    }
+
     /**
      * Delete segment path
      */
diff --git a/tool/src/main/java/org/apache/kylin/tool/CubeMigrationCLI.java 
b/tool/src/main/java/org/apache/kylin/tool/CubeMigrationCLI.java
index 4612cef..550da0c 100644
--- a/tool/src/main/java/org/apache/kylin/tool/CubeMigrationCLI.java
+++ b/tool/src/main/java/org/apache/kylin/tool/CubeMigrationCLI.java
@@ -18,22 +18,20 @@
 
 package org.apache.kylin.tool;
 
+import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
-import java.util.Locale;
 import java.util.Map;
 import java.util.Set;
 
 import org.apache.commons.cli.Options;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.StorageURL;
 import org.apache.kylin.common.persistence.JsonSerializer;
@@ -42,30 +40,23 @@ import org.apache.kylin.common.persistence.ResourceStore;
 import org.apache.kylin.common.persistence.Serializer;
 import org.apache.kylin.common.restclient.RestClient;
 import org.apache.kylin.common.util.AbstractApplication;
-import org.apache.kylin.common.util.Dictionary;
 import org.apache.kylin.common.util.HadoopUtil;
 import org.apache.kylin.common.util.OptionsHelper;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.dict.DictionaryInfo;
-import org.apache.kylin.dict.DictionaryManager;
-import org.apache.kylin.dict.lookup.SnapshotManager;
-import org.apache.kylin.dict.lookup.SnapshotTable;
-import org.apache.kylin.engine.mr.JobBuilderSupport;
+import org.apache.kylin.cube.model.DictionaryDesc;
+import org.apache.kylin.engine.spark.metadata.cube.PathManager;
 import org.apache.kylin.metadata.MetadataConstants;
 import org.apache.kylin.metadata.model.DataModelDesc;
-import org.apache.kylin.metadata.model.IStorageAware;
 import org.apache.kylin.metadata.model.SegmentStatusEnum;
 import org.apache.kylin.metadata.model.TableDesc;
-import org.apache.kylin.metadata.model.TableExtDesc;
 import org.apache.kylin.metadata.model.TableRef;
+import org.apache.kylin.metadata.model.TableExtDesc;
 import org.apache.kylin.metadata.project.ProjectInstance;
-import org.apache.kylin.metadata.realization.IRealizationConstants;
 import org.apache.kylin.metadata.realization.RealizationStatusEnum;
 import org.apache.kylin.metadata.realization.RealizationType;
-import org.apache.kylin.storage.hbase.HBaseConnection;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -86,14 +77,17 @@ public class CubeMigrationCLI extends AbstractApplication {
     protected KylinConfig dstConfig;
     protected ResourceStore srcStore;
     protected ResourceStore dstStore;
-    protected FileSystem hdfsFS;
-    private HBaseAdmin hbaseAdmin;
+    protected FileSystem hdfsFs;
+    protected Configuration conf;
     protected boolean doAclCopy = false;
     protected boolean doOverwrite = false;
     protected boolean doMigrateSegment = true;
     protected String dstProject;
+    protected String srcHdfsWorkDir;
+    protected String dstHdfsWorkDir;
 
     private static final String ACL_PREFIX = "/acl/";
+    private static final String GLOBAL_DICT_PREFIX = "/dict/global_dict/";
 
     public static void main(String[] args) throws IOException, 
InterruptedException {
 
@@ -125,7 +119,7 @@ public class CubeMigrationCLI extends AbstractApplication {
     }
 
     public void moveCube(String srcCfgUri, String dstCfgUri, String cubeName, 
String projectName, String copyAcl,
-            String purgeAndDisable, String overwriteIfExists, String 
realExecute)
+                         String purgeAndDisable, String overwriteIfExists, 
String realExecute)
             throws IOException, InterruptedException {
 
         moveCube(KylinConfig.createInstanceFromUri(srcCfgUri), 
KylinConfig.createInstanceFromUri(dstCfgUri), cubeName,
@@ -133,7 +127,7 @@ public class CubeMigrationCLI extends AbstractApplication {
     }
 
     public void moveCube(KylinConfig srcCfg, KylinConfig dstCfg, String 
cubeName, String projectName, String copyAcl,
-            String purgeAndDisable, String overwriteIfExists, String 
realExecute)
+                         String purgeAndDisable, String overwriteIfExists, 
String realExecute)
             throws IOException, InterruptedException {
 
         moveCube(srcCfg, dstCfg, cubeName, projectName, 
Boolean.parseBoolean(copyAcl),
@@ -142,7 +136,7 @@ public class CubeMigrationCLI extends AbstractApplication {
     }
 
     public void moveCube(String srcCfgUri, String dstCfgUri, String cubeName, 
String projectName, String copyAcl,
-            String purgeAndDisable, String overwriteIfExists, String 
realExecute, String migrateSegment)
+                         String purgeAndDisable, String overwriteIfExists, 
String realExecute, String migrateSegment)
             throws IOException, InterruptedException {
 
         moveCube(KylinConfig.createInstanceFromUri(srcCfgUri), 
KylinConfig.createInstanceFromUri(dstCfgUri), cubeName,
@@ -152,7 +146,7 @@ public class CubeMigrationCLI extends AbstractApplication {
     }
 
     public void moveCube(KylinConfig srcCfg, KylinConfig dstCfg, String 
cubeName, String projectName, boolean copyAcl,
-            boolean purgeAndDisable, boolean overwriteIfExists, boolean 
realExecute, boolean migrateSegment)
+                         boolean purgeAndDisable, boolean overwriteIfExists, 
boolean realExecute, boolean migrateSegment)
             throws IOException, InterruptedException {
         doAclCopy = copyAcl;
         doOverwrite = overwriteIfExists;
@@ -162,26 +156,24 @@ public class CubeMigrationCLI extends AbstractApplication 
{
         dstConfig = dstCfg;
         dstStore = ResourceStore.getStore(dstConfig);
         dstProject = projectName;
+        conf = HadoopUtil.getCurrentConfiguration();
 
         CubeManager cubeManager = CubeManager.getInstance(srcConfig);
         CubeInstance cube = cubeManager.getCube(cubeName);
+        srcHdfsWorkDir = srcConfig.getHdfsWorkingDirectory(cube.getProject());
+        dstHdfsWorkDir = dstConfig.getHdfsWorkingDirectory(dstProject);
         logger.info("cube to be moved is : " + cubeName);
 
         if (migrateSegment) {
             checkCubeState(cube);
         }
 
-        checkAndGetHbaseUrl();
+        checkAndGetMetadataUrl();
 
-        Configuration conf = HBaseConnection.getCurrentHBaseConfiguration();
-        hbaseAdmin = new HBaseAdmin(conf);
-        hdfsFS = HadoopUtil.getWorkingFileSystem();
+        hdfsFs = HadoopUtil.getWorkingFileSystem();
         operations = new ArrayList<Opt>();
         copyFilesInMetaStore(cube);
-        if (migrateSegment) {
-            renameFoldersInHdfs(cube);
-            changeHtableHost(cube);
-        } else {
+        if (!migrateSegment) {
             clearSegments(cubeName); // this should be after 
copyFilesInMetaStore
         }
         addCubeAndModelIntoProject(cube, cubeName);
@@ -192,20 +184,12 @@ public class CubeMigrationCLI extends AbstractApplication 
{
 
         if (realExecute) {
             doOpts();
-            if (migrateSegment) {
-                checkMigrationSuccess(dstConfig, cubeName, true);
-            }
             updateMeta(dstConfig, projectName, cubeName, cube.getModel());
         } else {
             showOpts();
         }
     }
 
-    public void checkMigrationSuccess(KylinConfig kylinConfig, String 
cubeName, Boolean ifFix) throws IOException {
-        CubeMigrationCheckCLI checkCLI = new 
CubeMigrationCheckCLI(kylinConfig, ifFix);
-        checkCLI.execute(cubeName);
-    }
-
     protected void checkCubeState(CubeInstance cube) {
         if (cube.getStatus() != RealizationStatusEnum.READY)
             throw new IllegalStateException("Cannot migrate cube that is not 
in READY state.");
@@ -217,40 +201,16 @@ public class CubeMigrationCLI extends AbstractApplication 
{
         }
     }
 
-    protected void checkAndGetHbaseUrl() {
+    protected void checkAndGetMetadataUrl() {
         StorageURL srcMetadataUrl = srcConfig.getMetadataUrl();
         StorageURL dstMetadataUrl = dstConfig.getMetadataUrl();
 
         logger.info("src metadata url is " + srcMetadataUrl);
         logger.info("dst metadata url is " + dstMetadataUrl);
-
-        if (!"hbase".equals(srcMetadataUrl.getScheme()) || 
!"hbase".equals(dstMetadataUrl.getScheme()))
-            throw new IllegalStateException("Both metadata urls should be 
hbase metadata url");
-    }
-
-    protected void renameFoldersInHdfs(CubeInstance cube) throws IOException {
-        for (CubeSegment segment : cube.getSegments()) {
-
-            String jobUuid = segment.getLastBuildJobID();
-            String src = 
JobBuilderSupport.getJobWorkingDir(srcConfig.getHdfsWorkingDirectory(), 
jobUuid);
-            String tgt = 
JobBuilderSupport.getJobWorkingDir(dstConfig.getHdfsWorkingDirectory(), 
jobUuid);
-
-            operations.add(new Opt(OptType.RENAME_FOLDER_IN_HDFS, new Object[] 
{ src, tgt }));
-        }
-
-    }
-
-    protected void changeHtableHost(CubeInstance cube) {
-        if (cube.getDescriptor().getStorageType() != 
IStorageAware.ID_SHARDED_HBASE)
-            return;
-        for (CubeSegment segment : cube.getSegments()) {
-            operations
-                    .add(new Opt(OptType.CHANGE_HTABLE_HOST, new Object[] { 
segment.getStorageLocationIdentifier() }));
-        }
     }
 
     protected void clearSegments(String cubeName) throws IOException {
-        operations.add(new Opt(OptType.CLEAR_SEGMENTS, new Object[] { cubeName 
}));
+        operations.add(new Opt(OptType.CLEAR_SEGMENTS, new 
Object[]{cubeName}));
     }
 
     protected void copyFilesInMetaStore(CubeInstance cube) throws IOException {
@@ -260,16 +220,22 @@ public class CubeMigrationCLI extends AbstractApplication 
{
                     + " already exists on target metadata store. Use 
overwriteIfExists to overwrite it");
 
         List<String> metaItems = new ArrayList<String>();
+        List<String> srcParquetFiles = new ArrayList<String>();
+        List<String> dstParquetFiles = new ArrayList<String>();
         Set<String> dictAndSnapshot = new HashSet<String>();
-        listCubeRelatedResources(cube, metaItems, dictAndSnapshot);
+        listCubeRelatedResources(cube, metaItems, dictAndSnapshot, 
srcParquetFiles, dstParquetFiles);
 
         for (String item : metaItems) {
-            operations.add(new Opt(OptType.COPY_FILE_IN_META, new Object[] { 
item }));
+            operations.add(new Opt(OptType.COPY_FILE_IN_META, new 
Object[]{item}));
         }
 
         if (doMigrateSegment) {
             for (String item : dictAndSnapshot) {
-                operations.add(new Opt(OptType.COPY_DICT_OR_SNAPSHOT, new 
Object[] { item, cube.getName() }));
+                operations.add(new Opt(OptType.COPY_DICT_OR_SNAPSHOT, new 
Object[]{item, cube.getName()}));
+            }
+
+            for (int i = 0; i < srcParquetFiles.size(); i++) {
+                operations.add(new Opt(OptType.COPY_PARQUET_FILE, new 
Object[]{srcParquetFiles.get(i), dstParquetFiles.get(i)}));
             }
         }
     }
@@ -279,11 +245,11 @@ public class CubeMigrationCLI extends AbstractApplication 
{
         if (!dstStore.exists(projectResPath))
             throw new IllegalStateException("The target project " + dstProject 
+ " does not exist");
 
-        operations.add(new Opt(OptType.ADD_INTO_PROJECT, new Object[] { 
srcCube, cubeName, dstProject }));
+        operations.add(new Opt(OptType.ADD_INTO_PROJECT, new Object[]{srcCube, 
cubeName, dstProject}));
     }
 
     private void purgeAndDisable(String cubeName) throws IOException {
-        operations.add(new Opt(OptType.PURGE_AND_DISABLE, new Object[] { 
cubeName }));
+        operations.add(new Opt(OptType.PURGE_AND_DISABLE, new 
Object[]{cubeName}));
     }
 
     private List<String> getCompatibleTablePath(Set<TableRef> tableRefs, 
String project, String rootPath)
@@ -311,7 +277,7 @@ public class CubeMigrationCLI extends AbstractApplication {
         return toResource;
     }
 
-    protected void listCubeRelatedResources(CubeInstance cube, List<String> 
metaResource, Set<String> dictAndSnapshot)
+    protected void listCubeRelatedResources(CubeInstance cube, List<String> 
metaResource, Set<String> dictAndSnapshot, List<String> srcParquetFiles, 
List<String> dstParquetFiles)
             throws IOException {
 
         CubeDesc cubeDesc = cube.getDescriptor();
@@ -326,10 +292,25 @@ public class CubeMigrationCLI extends AbstractApplication 
{
         metaResource.addAll(getCompatibleTablePath(tblRefs, prj, 
ResourceStore.TABLE_EXD_RESOURCE_ROOT));
 
         if (doMigrateSegment) {
+            for (DictionaryDesc dictionaryDesc : cubeDesc.getDictionaries()) {
+                String[] columnInfo = 
dictionaryDesc.getColumnRef().getColumnWithTable().split("\\.");
+                String globalDictPath;
+                if (columnInfo.length == 3) {
+                    globalDictPath = cube.getProject() + GLOBAL_DICT_PREFIX + 
columnInfo[1] + File.separator + columnInfo[2];
+                } else {
+                    globalDictPath = cube.getProject() + GLOBAL_DICT_PREFIX + 
columnInfo[0] + File.separator + columnInfo[1];
+                }
+                if (globalDictPath != null) {
+                    logger.info("Add " + globalDictPath + " to migrate dict 
list");
+                    dictAndSnapshot.add(globalDictPath);
+                }
+            }
             for (CubeSegment segment : cube.getSegments()) {
                 metaResource.add(segment.getStatisticsResourcePath());
                 dictAndSnapshot.addAll(segment.getSnapshotPaths());
-                dictAndSnapshot.addAll(segment.getDictionaryPaths());
+                
srcParquetFiles.add(PathManager.getSegmentParquetStoragePath(srcHdfsWorkDir, 
cube.getName(), segment));
+                
dstParquetFiles.add(PathManager.getSegmentParquetStoragePath(dstHdfsWorkDir, 
cube.getName(), segment));
+                logger.info("Add " + 
PathManager.getSegmentParquetStoragePath(cube, segment.getName(), 
segment.getStorageLocationIdentifier()) + " to migrate parquet file list");
             }
         }
 
@@ -337,11 +318,6 @@ public class CubeMigrationCLI extends AbstractApplication {
             metaResource.add(ACL_PREFIX + cube.getUuid());
             metaResource.add(ACL_PREFIX + cube.getModel().getUuid());
         }
-
-//        if (cubeDesc.isStreamingCube()) {
-//            // add streaming source config info for streaming cube
-//            
metaResource.add(StreamingSourceConfig.concatResourcePath(cubeDesc.getModel().getRootFactTableName()));
-//        }
     }
 
     @Override
@@ -355,7 +331,7 @@ public class CubeMigrationCLI extends AbstractApplication {
     }
 
     protected enum OptType {
-        COPY_FILE_IN_META, COPY_DICT_OR_SNAPSHOT, RENAME_FOLDER_IN_HDFS, 
ADD_INTO_PROJECT, CHANGE_HTABLE_HOST, PURGE_AND_DISABLE, CLEAR_SEGMENTS
+        COPY_FILE_IN_META, COPY_DICT_OR_SNAPSHOT, COPY_PARQUET_FILE, 
ADD_INTO_PROJECT, PURGE_AND_DISABLE, CLEAR_SEGMENTS
     }
 
     protected void addOpt(OptType type, Object[] params) {
@@ -420,161 +396,94 @@ public class CubeMigrationCLI extends 
AbstractApplication {
         logger.info("Executing operation: " + opt.toString());
 
         switch (opt.type) {
-        case CHANGE_HTABLE_HOST: {
-            String tableName = (String) opt.params[0];
-            System.out.println("CHANGE_HTABLE_HOST, table name: " + tableName);
-            HTableDescriptor desc = 
hbaseAdmin.getTableDescriptor(TableName.valueOf(tableName));
-            hbaseAdmin.disableTable(tableName);
-            desc.setValue(IRealizationConstants.HTableTag, 
dstConfig.getMetadataUrlPrefix());
-            hbaseAdmin.modifyTable(tableName, desc);
-            hbaseAdmin.enableTable(tableName);
-            logger.info("CHANGE_HTABLE_HOST is completed");
-            break;
-        }
-        case COPY_FILE_IN_META: {
-            String item = (String) opt.params[0];
-            RawResource res = srcStore.getResource(item);
-            if (res == null) {
-                logger.info("Item: {} doesn't exist, ignore it.", item);
+            case COPY_FILE_IN_META: {
+                String item = (String) opt.params[0];
+                RawResource res = srcStore.getResource(item);
+                if (res == null) {
+                    logger.info("Item: {} doesn't exist, ignore it.", item);
+                    break;
+                }
+                dstStore.putResource(renameTableWithinProject(item), 
res.content(), res.lastModified());
+                res.content().close();
+                logger.info("Item " + item + " is copied");
                 break;
             }
-            dstStore.putResource(renameTableWithinProject(item), 
res.content(), res.lastModified());
-            res.content().close();
-            logger.info("Item " + item + " is copied");
-            break;
-        }
-        case COPY_DICT_OR_SNAPSHOT: {
-            String item = (String) opt.params[0];
-
-            if (item.toLowerCase(Locale.ROOT).endsWith(".dict")) {
-                DictionaryManager dstDictMgr = 
DictionaryManager.getInstance(dstConfig);
-                DictionaryManager srcDicMgr = 
DictionaryManager.getInstance(srcConfig);
-                DictionaryInfo dictSrc = srcDicMgr.getDictionaryInfo(item);
-
-                long ts = dictSrc.getLastModified();
-                dictSrc.setLastModified(0);//to avoid resource store write 
conflict
-                Dictionary dictObj = 
dictSrc.getDictionaryObject().copyToAnotherMeta(srcConfig, dstConfig);
-                DictionaryInfo dictSaved = dstDictMgr.trySaveNewDict(dictObj, 
dictSrc);
-                dictSrc.setLastModified(ts);
-
-                if (dictSaved == dictSrc) {
-                    //no dup found, already saved to dest
-                    logger.info("Item " + item + " is copied");
+            case COPY_DICT_OR_SNAPSHOT: {
+                String item = (String) opt.params[0];
+                String itemPath = item.substring(item.substring(0, 
item.indexOf("/")).length()+1);
+                Path srcPath = new Path(srcHdfsWorkDir + itemPath);
+                Path dstPath = new Path(dstHdfsWorkDir + itemPath);
+                if (hdfsFs.exists(srcPath)) {
+                    FileUtil.copy(hdfsFs, srcPath, hdfsFs, dstPath, false, 
true, conf);
+                    logger.info("Copy " + srcPath + " to " + dstPath);
                 } else {
-                    //dictSrc is rejected because of duplication
-                    //modify cube's dictionary path
-                    String cubeName = (String) opt.params[1];
-                    String cubeResPath = 
CubeInstance.concatResourcePath(cubeName);
-                    Serializer<CubeInstance> cubeSerializer = new 
JsonSerializer<CubeInstance>(CubeInstance.class);
-                    CubeInstance cube = dstStore.getResource(cubeResPath, 
cubeSerializer);
-                    for (CubeSegment segment : cube.getSegments()) {
-                        for (Map.Entry<String, String> entry : 
segment.getDictionaries().entrySet()) {
-                            if (entry.getValue().equalsIgnoreCase(item)) {
-                                entry.setValue(dictSaved.getResourcePath());
-                            }
-                        }
-                    }
-                    dstStore.checkAndPutResource(cubeResPath, cube, 
cubeSerializer);
-                    logger.info("Item " + item + " is dup, instead " + 
dictSaved.getResourcePath() + " is reused");
+                    logger.info("Dict or snapshot " + srcPath + " is not 
exists, ignore it");
                 }
-
-            } else if (item.toLowerCase(Locale.ROOT).endsWith(".snapshot")) {
-                SnapshotManager dstSnapMgr = 
SnapshotManager.getInstance(dstConfig);
-                SnapshotManager srcSnapMgr = 
SnapshotManager.getInstance(srcConfig);
-                SnapshotTable snapSrc = srcSnapMgr.getSnapshotTable(item);
-
-                long ts = snapSrc.getLastModified();
-                snapSrc.setLastModified(0);
-                SnapshotTable snapSaved = 
dstSnapMgr.trySaveNewSnapshot(snapSrc);
-                snapSrc.setLastModified(ts);
-
-                if (snapSaved == snapSrc) {
-                    //no dup found, already saved to dest
-                    logger.info("Item " + item + " is copied");
-
+                break;
+            }
+            case COPY_PARQUET_FILE: {
+                Path srcPath = new Path((String) opt.params[0]);
+                Path dstPath = new Path((String) opt.params[1]);
+                if (hdfsFs.exists(srcPath)) {
+                    FileUtil.copy(hdfsFs, srcPath, hdfsFs, dstPath, false, 
true, conf);
+                    logger.info("Copy " + srcPath + " to " + dstPath);
                 } else {
-                    String cubeName = (String) opt.params[1];
-                    String cubeResPath = 
CubeInstance.concatResourcePath(cubeName);
-                    Serializer<CubeInstance> cubeSerializer = new 
JsonSerializer<CubeInstance>(CubeInstance.class);
-                    CubeInstance cube = dstStore.getResource(cubeResPath, 
cubeSerializer);
-                    for (CubeSegment segment : cube.getSegments()) {
-                        for (Map.Entry<String, String> entry : 
segment.getSnapshots().entrySet()) {
-                            if (entry.getValue().equalsIgnoreCase(item)) {
-                                entry.setValue(snapSaved.getResourcePath());
-                            }
-                        }
-                    }
-                    dstStore.checkAndPutResource(cubeResPath, cube, 
cubeSerializer);
-                    logger.info("Item " + item + " is dup, instead " + 
snapSaved.getResourcePath() + " is reused");
-
+                    logger.info("Parquet file " + srcPath + " is not exists, 
ignore it");
                 }
-
-            } else {
-                logger.error("unknown item found: " + item);
-                logger.info("ignore it");
+                break;
             }
+            case ADD_INTO_PROJECT: {
+                CubeInstance srcCube = (CubeInstance) opt.params[0];
+                String cubeName = (String) opt.params[1];
+                String projectName = (String) opt.params[2];
+                String modelName = srcCube.getDescriptor().getModelName();
+
+                String projectResPath = 
ProjectInstance.concatResourcePath(projectName);
+                Serializer<ProjectInstance> projectSerializer = new 
JsonSerializer<ProjectInstance>(ProjectInstance.class);
+                ProjectInstance project = dstStore.getResource(projectResPath, 
projectSerializer);
+
+                for (TableRef tableRef : srcCube.getModel().getAllTables()) {
+                    project.addTable(tableRef.getTableIdentity());
+                }
 
-            break;
-        }
-        case RENAME_FOLDER_IN_HDFS: {
-            String srcPath = (String) opt.params[0];
-            String dstPath = (String) opt.params[1];
-            renameHDFSPath(srcPath, dstPath);
-            logger.info("HDFS Folder renamed from " + srcPath + " to " + 
dstPath);
-            break;
-        }
-        case ADD_INTO_PROJECT: {
-            CubeInstance srcCube = (CubeInstance) opt.params[0];
-            String cubeName = (String) opt.params[1];
-            String projectName = (String) opt.params[2];
-            String modelName = srcCube.getDescriptor().getModelName();
-
-            String projectResPath = 
ProjectInstance.concatResourcePath(projectName);
-            Serializer<ProjectInstance> projectSerializer = new 
JsonSerializer<ProjectInstance>(ProjectInstance.class);
-            ProjectInstance project = dstStore.getResource(projectResPath, 
projectSerializer);
-
-            for (TableRef tableRef : srcCube.getModel().getAllTables()) {
-                project.addTable(tableRef.getTableIdentity());
-            }
+                if (!project.getModels().contains(modelName))
+                    project.addModel(modelName);
+                project.removeRealization(RealizationType.CUBE, cubeName);
+                project.addRealizationEntry(RealizationType.CUBE, cubeName);
 
-            if (!project.getModels().contains(modelName))
-                project.addModel(modelName);
-            project.removeRealization(RealizationType.CUBE, cubeName);
-            project.addRealizationEntry(RealizationType.CUBE, cubeName);
+                dstStore.checkAndPutResource(projectResPath, project, 
projectSerializer);
+                logger.info("Project instance for " + projectName + " is 
corrected");
+                break;
+            }
+            case CLEAR_SEGMENTS: {
+                String cubeName = (String) opt.params[0];
+                String cubeInstancePath = 
CubeInstance.concatResourcePath(cubeName);
+                Serializer<CubeInstance> cubeInstanceSerializer = new 
JsonSerializer<CubeInstance>(CubeInstance.class);
+                CubeInstance cubeInstance = 
dstStore.getResource(cubeInstancePath, cubeInstanceSerializer);
+                cubeInstance.getSegments().clear();
+                cubeInstance.clearCuboids();
+                cubeInstance.setCreateTimeUTC(System.currentTimeMillis());
+                cubeInstance.setStatus(RealizationStatusEnum.DISABLED);
+                dstStore.checkAndPutResource(cubeInstancePath, cubeInstance, 
cubeInstanceSerializer);
+                logger.info("Cleared segments for " + cubeName + ", since 
segments has not been copied");
+                break;
+            }
+            case PURGE_AND_DISABLE: {
+                String cubeName = (String) opt.params[0];
+                String cubeResPath = CubeInstance.concatResourcePath(cubeName);
+                Serializer<CubeInstance> cubeSerializer = new 
JsonSerializer<CubeInstance>(CubeInstance.class);
+                CubeInstance cube = srcStore.getResource(cubeResPath, 
cubeSerializer);
+                cube.getSegments().clear();
+                cube.setStatus(RealizationStatusEnum.DISABLED);
+                srcStore.checkAndPutResource(cubeResPath, cube, 
cubeSerializer);
+                logger.info("Cube " + cubeName + " is purged and disabled in " 
+ srcConfig.getMetadataUrl());
 
-            dstStore.checkAndPutResource(projectResPath, project, 
projectSerializer);
-            logger.info("Project instance for " + projectName + " is 
corrected");
-            break;
-        }
-        case CLEAR_SEGMENTS: {
-            String cubeName = (String) opt.params[0];
-            String cubeInstancePath = 
CubeInstance.concatResourcePath(cubeName);
-            Serializer<CubeInstance> cubeInstanceSerializer = new 
JsonSerializer<CubeInstance>(CubeInstance.class);
-            CubeInstance cubeInstance = dstStore.getResource(cubeInstancePath, 
cubeInstanceSerializer);
-            cubeInstance.getSegments().clear();
-            cubeInstance.clearCuboids();
-            cubeInstance.setCreateTimeUTC(System.currentTimeMillis());
-            cubeInstance.setStatus(RealizationStatusEnum.DISABLED);
-            dstStore.checkAndPutResource(cubeInstancePath, cubeInstance, 
cubeInstanceSerializer);
-            logger.info("Cleared segments for " + cubeName + ", since segments 
has not been copied");
-            break;
-        }
-        case PURGE_AND_DISABLE: {
-            String cubeName = (String) opt.params[0];
-            String cubeResPath = CubeInstance.concatResourcePath(cubeName);
-            Serializer<CubeInstance> cubeSerializer = new 
JsonSerializer<CubeInstance>(CubeInstance.class);
-            CubeInstance cube = srcStore.getResource(cubeResPath, 
cubeSerializer);
-            cube.getSegments().clear();
-            cube.setStatus(RealizationStatusEnum.DISABLED);
-            srcStore.checkAndPutResource(cubeResPath, cube, cubeSerializer);
-            logger.info("Cube " + cubeName + " is purged and disabled in " + 
srcConfig.getMetadataUrl());
-
-            break;
-        }
-        default: {
-            //do nothing
-            break;
-        }
+                break;
+            }
+            default: {
+                //do nothing
+                break;
+            }
         }
     }
 
@@ -582,53 +491,38 @@ public class CubeMigrationCLI extends AbstractApplication 
{
         logger.info("Undo operation: " + opt.toString());
 
         switch (opt.type) {
-        case CHANGE_HTABLE_HOST: {
-            String tableName = (String) opt.params[0];
-            HTableDescriptor desc = 
hbaseAdmin.getTableDescriptor(TableName.valueOf(tableName));
-            hbaseAdmin.disableTable(tableName);
-            desc.setValue(IRealizationConstants.HTableTag, 
srcConfig.getMetadataUrlPrefix());
-            hbaseAdmin.modifyTable(tableName, desc);
-            hbaseAdmin.enableTable(tableName);
-            break;
-        }
-        case COPY_FILE_IN_META: {
-            // no harm
-            logger.info("Undo for COPY_FILE_IN_META is ignored");
-            String item = (String) opt.params[0];
-
-            if (item.startsWith(ACL_PREFIX) && doAclCopy) {
-                logger.info("Remove acl record");
-                dstStore.deleteResource(item);
+            case COPY_FILE_IN_META: {
+                // no harm
+                logger.info("Undo for COPY_FILE_IN_META is ignored");
+                String item = (String) opt.params[0];
+
+                if (item.startsWith(ACL_PREFIX) && doAclCopy) {
+                    logger.info("Remove acl record");
+                    dstStore.deleteResource(item);
+                }
+                break;
             }
-            break;
-        }
-        case COPY_DICT_OR_SNAPSHOT: {
-            // no harm
-            logger.info("Undo for COPY_DICT_OR_SNAPSHOT is ignored");
-            break;
-        }
-        case RENAME_FOLDER_IN_HDFS: {
-            String srcPath = (String) opt.params[1];
-            String dstPath = (String) opt.params[0];
-
-            if (hdfsFS.exists(new Path(srcPath)) && !hdfsFS.exists(new 
Path(dstPath))) {
-                renameHDFSPath(srcPath, dstPath);
-                logger.info("HDFS Folder renamed from " + srcPath + " to " + 
dstPath);
+            case COPY_DICT_OR_SNAPSHOT: {
+                // no harm
+                logger.info("Undo for COPY_DICT_OR_SNAPSHOT is ignored");
+                break;
+            }
+            case ADD_INTO_PROJECT: {
+                logger.info("Undo for ADD_INTO_PROJECT is ignored");
+                break;
+            }
+            case PURGE_AND_DISABLE: {
+                logger.info("Undo for PURGE_AND_DISABLE is not supported");
+                break;
+            }
+            case COPY_PARQUET_FILE: {
+                logger.info("Undo for COPY_PARQUET_FILE is ignored");
+                break;
+            }
+            default: {
+                //do nothing
+                break;
             }
-            break;
-        }
-        case ADD_INTO_PROJECT: {
-            logger.info("Undo for ADD_INTO_PROJECT is ignored");
-            break;
-        }
-        case PURGE_AND_DISABLE: {
-            logger.info("Undo for PURGE_AND_DISABLE is not supported");
-            break;
-        }
-        default: {
-            //do nothing
-            break;
-        }
         }
     }
 
@@ -660,17 +554,4 @@ public class CubeMigrationCLI extends AbstractApplication {
             }
         }
     }
-
-    private void renameHDFSPath(String srcPath, String dstPath) throws 
IOException, InterruptedException {
-        int nRetry = 0;
-        int sleepTime = 5000;
-        while (!hdfsFS.rename(new Path(srcPath), new Path(dstPath))) {
-            ++nRetry;
-            if (nRetry > 3) {
-                throw new InterruptedException("Cannot rename folder " + 
srcPath + " to folder " + dstPath);
-            } else {
-                Thread.sleep((long) sleepTime * nRetry * nRetry);
-            }
-        }
-    }
-}
+}
\ No newline at end of file

Reply via email to