This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/master by this push: new 4572cf4 add more check rules for migration streaming cube 4572cf4 is described below commit 4572cf488c20c383b8f5f6e11bf395c1fde4cb49 Author: kliu3 <liu...@apache.org> AuthorDate: Wed Apr 7 15:55:21 2021 +0800 add more check rules for migration streaming cube --- .../apache/kylin/common/restclient/RestClient.java | 4 ++ .../apache/kylin/metadata/model/ColumnDesc.java | 4 ++ .../org/apache/kylin/metadata/model/TableDesc.java | 4 ++ .../kylin/rest/controller/MigrationController.java | 42 +++++++++++++++ .../kylin/rest/service/MigrationRuleSet.java | 60 ++++++++++++++++++++-- .../kylin/rest/service/MigrationService.java | 2 +- .../apache/kylin/rest/service/BasicService.java | 5 ++ .../kylin/rest/service/StreamingV2Service.java | 22 ++++++++ .../rest/service/TableSchemaUpdateChecker.java | 38 +++++++++++++- .../apache/kylin/rest/service/TableService.java | 12 ++++- server/src/main/resources/kylinSecurity.xml | 2 + .../stream/core/source/MessageParserInfo.java | 17 ++++++ .../org/apache/kylin/tool/CubeMigrationCLI.java | 13 ++++- .../StreamTableCompatibilityCheckRequest.java | 49 ++++++++++++++++++ 14 files changed, 266 insertions(+), 8 deletions(-) diff --git a/core-common/src/main/java/org/apache/kylin/common/restclient/RestClient.java b/core-common/src/main/java/org/apache/kylin/common/restclient/RestClient.java index fcd8706..a054c42 100644 --- a/core-common/src/main/java/org/apache/kylin/common/restclient/RestClient.java +++ b/core-common/src/main/java/org/apache/kylin/common/restclient/RestClient.java @@ -393,6 +393,10 @@ public class RestClient { checkCompatibility(jsonRequest, baseUrl + "/cubes/checkCompatibility"); } + public void checkStreamTableCompatibility(String jsonRequest) throws IOException { + checkCompatibility(jsonRequest, baseUrl+"/cubes/checkStreamTableCompatibility"); + } + private void checkCompatibility(String jsonRequest, String url) throws IOException { HttpPost post = newPost(url); try { diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/ColumnDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ColumnDesc.java index 455f586..7ec5bb3 100644 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/ColumnDesc.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ColumnDesc.java @@ -28,6 +28,7 @@ import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility; import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonProperty; import org.apache.kylin.shaded.com.google.common.base.Preconditions; +import org.relaxng.datatype.Datatype; /** * Column Metadata from Source. All name should be uppercase. @@ -158,6 +159,9 @@ public class ColumnDesc implements Serializable { } public DataType getType() { + if (type == null && datatype != null) { + this.type = DataType.getType(datatype); + } return type; } diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableDesc.java index d99ff54..9f701dc 100644 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableDesc.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableDesc.java @@ -456,4 +456,8 @@ public class TableDesc extends RootPersistentEntity implements ISourceAware { return false; } + public boolean isLambdaTable() { + return sourceType == ISourceAware.ID_KAFKA_HIVE; + } + } diff --git a/cube-migration/src/main/java/org/apache/kylin/rest/controller/MigrationController.java b/cube-migration/src/main/java/org/apache/kylin/rest/controller/MigrationController.java index 45f83ea..58860c5 100644 --- a/cube-migration/src/main/java/org/apache/kylin/rest/controller/MigrationController.java +++ b/cube-migration/src/main/java/org/apache/kylin/rest/controller/MigrationController.java @@ -35,8 +35,11 @@ import org.apache.kylin.rest.service.MigrationRuleSet; import org.apache.kylin.rest.service.MigrationService; import org.apache.kylin.rest.service.ModelService; import org.apache.kylin.rest.service.QueryService; +import org.apache.kylin.rest.service.StreamingV2Service; import org.apache.kylin.rest.service.TableService; +import org.apache.kylin.stream.core.source.StreamingSourceConfig; import org.apache.kylin.tool.migration.CompatibilityCheckRequest; +import org.apache.kylin.tool.migration.StreamTableCompatibilityCheckRequest; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -73,6 +76,9 @@ public class MigrationController extends BasicController { @Autowired private TableService tableService; + @Autowired + private StreamingV2Service streamingV2Service; + private final String targetHost = KylinConfig.getInstanceFromEnv().getMigrationTargetAddress(); private CubeInstance getCubeInstance(String cubeName) { @@ -140,6 +146,38 @@ public class MigrationController extends BasicController { return Strings.isNullOrEmpty(targetHost) ? this.targetHost : targetHost; } + @RequestMapping(value = "/checkStreamTableCompatibility", method = { RequestMethod.POST }) + @ResponseBody + public void checkStreamTableCompatibility(@RequestBody StreamTableCompatibilityCheckRequest request) { + TableDesc tableDesc = null; + try { + tableDesc = JsonUtil.readValue(request.getTableDesc(), TableDesc.class); + // check table desc + logger.info("Stream table compatibility check for table {}, project {}", + tableDesc.getName(), tableDesc.getProject()); + tableService.checkStreamTableCompatibility(request.getProjectName(), tableDesc); + logger.info("Pass stream table compatibility check for table {}, project {}", + tableDesc.getName(), tableDesc.getProject()); + } catch (Exception e) { + logger.error(e.getMessage(), e); + throw new ConflictException(e.getMessage(), e); + } + + // check stream source config + StreamingSourceConfig config = null; + try { + config = JsonUtil.readValue(request.getStreamSource(), StreamingSourceConfig.class); + logger.info("Stream source config compatibility check for table {}, project {}", + tableDesc.getName(), tableDesc.getProject()); + streamingV2Service.checkStreamingSourceCompatibility(request.getProjectName(), config); + logger.info("Pass stream source config compatibility check for table {}, project {}", + tableDesc.getName(), tableDesc.getProject()); + } catch (Exception e) { + logger.error(e.getMessage(), e); + throw new ConflictException(e.getMessage(), e); + } + } + /** * Check the schema compatibility for table, model desc */ @@ -199,4 +237,8 @@ public class MigrationController extends BasicController { } return result; } + + private boolean isStreamingTable(CubeInstance cube) { + return cube.getDescriptor().getModel().getRootFactTable().getTableDesc().isStreamingTable(); + } } diff --git a/cube-migration/src/main/java/org/apache/kylin/rest/service/MigrationRuleSet.java b/cube-migration/src/main/java/org/apache/kylin/rest/service/MigrationRuleSet.java index de158bd..ab67643 100644 --- a/cube-migration/src/main/java/org/apache/kylin/rest/service/MigrationRuleSet.java +++ b/cube-migration/src/main/java/org/apache/kylin/rest/service/MigrationRuleSet.java @@ -55,7 +55,10 @@ import org.apache.kylin.rest.request.SQLRequest; import org.apache.kylin.rest.response.SQLResponse; import org.apache.kylin.source.ISourceMetadataExplorer; import org.apache.kylin.source.SourceManager; +import org.apache.kylin.stream.core.source.StreamingSourceConfig; +import org.apache.kylin.stream.core.source.StreamingSourceConfigManager; import org.apache.kylin.tool.migration.CompatibilityCheckRequest; +import org.apache.kylin.tool.migration.StreamTableCompatibilityCheckRequest; import org.apache.kylin.tool.query.QueryGenerator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -86,6 +89,7 @@ public class MigrationRuleSet { public static final Rule DEFAULT_SEGMENT_RULE = new SegmentRule(); public static final Rule DEFAULT_CUBE_OVERWRITE_RULE = new CubeOverwriteRule(); public static final Rule DEFAULT_QUERY_LATENCY_RULE = new QueryLatencyRule(); + public static final Rule DEFAULT_STREAM_TABLE_CHECK_RULE = new StreamTableCompatibilityRule(); private static List<Rule> MUSTTOPASS_RULES = Lists.newLinkedList(); @@ -114,7 +118,9 @@ public class MigrationRuleSet { // initialize default rules static { register(DEFAULT_HIVE_TABLE_CONSISTENCY_RULE, DEFAULT_CUBE_STATUS_RULE, DEFAULT_PROJECT_EXIST_RULE, - DEFAULT_EMAIL_NOTIFY_RULE, DEFAULT_SEGMENT_RULE, DEFAULT_CUBE_OVERWRITE_RULE, DEFAULT_COMPATIBLE_RULE); + DEFAULT_EMAIL_NOTIFY_RULE, DEFAULT_SEGMENT_RULE, DEFAULT_CUBE_OVERWRITE_RULE, + DEFAULT_COMPATIBLE_RULE, DEFAULT_STREAM_TABLE_CHECK_RULE); + register(false, DEFAULT_AUTO_MERGE_RULE, DEFAULT_EXPANSION_RULE, DEFAULT_QUERY_LATENCY_RULE); } @@ -369,6 +375,17 @@ public class MigrationRuleSet { @Override public void apply(Context ctx) throws RuleValidationException { + // check table type + CubeInstance cube = ctx.getCubeInstance(); + boolean isStreamTable = cube.getDescriptor().getModel().getRootFactTable().getTableDesc().isStreamingTable(); + if (isStreamTable) { + // check lambda + if (!cube.getDescriptor().getModel().getRootFactTable().getTableDesc().isLambdaTable()) { + // streaming table without lambda doesn't need to check hive table + return; + } + } + // de-dup SetMultimap<String, String> db2tables = LinkedHashMultimap.create(); for (TableRef tableRef : ctx.getCubeInstance().getModel().getAllTables()) { @@ -400,7 +417,8 @@ public class MigrationRuleSet { // do schema check KylinConfig config = KylinConfig.getInstanceFromEnv(); TableSchemaUpdateChecker checker = new TableSchemaUpdateChecker(TableMetadataManager.getInstance(config), - CubeManager.getInstance(config), DataModelManager.getInstance(config)); + CubeManager.getInstance(config), DataModelManager.getInstance(config), + StreamingSourceConfigManager.getInstance(config)); for (Pair<TableDesc, TableExtDesc> pair : allMeta) { try { TableSchemaUpdateChecker.CheckResult result = checker.allowReload(pair.getFirst(), @@ -417,6 +435,42 @@ public class MigrationRuleSet { } + private static class StreamTableCompatibilityRule implements Rule { + + @Override + public void apply(Context ctx) throws RuleValidationException { + try { + checkStreamTableSchema(ctx); + } catch (IOException e) { + throw new RuleValidationException(e.getMessage(), e); + } + } + + public void checkStreamTableSchema(Context ctx) throws IOException { + // check stream kylin table + TableDesc tableDesc = ctx.getCubeInstance().getModel().getRootFactTable().getTableDesc(); + if (!tableDesc.isStreamingTable()) { + return; + } + logger.info("check the stream table schema, cubename {}, project {}, lambda {}", + ctx.cubeInstance.getName(), ctx.cubeInstance.getProject(), tableDesc.isLambdaTable()); + + // get stream source config + StreamingSourceConfig streamingSourceConfig = StreamingSourceConfigManager + .getInstance(ctx.cubeInstance.getConfig()) + .getConfig(tableDesc.getIdentity(), tableDesc.getProject()); + + StreamTableCompatibilityCheckRequest streamRequest = new StreamTableCompatibilityCheckRequest(); + streamRequest.setProjectName(ctx.getTgtProjectName()); + streamRequest.setTableDesc(JsonUtil.writeValueAsIndentString(tableDesc)); + streamRequest.setStreamSource(JsonUtil.writeValueAsIndentString(streamingSourceConfig)); + + String jsonRequest = JsonUtil.writeValueAsIndentString(streamRequest); + RestClient client = new RestClient(ctx.getTargetAddress()); + client.checkStreamTableCompatibility(jsonRequest); + } + } + public static class Context { private final QueryService queryService; private final CubeInstance cubeInstance; @@ -466,4 +520,4 @@ public class MigrationRuleSet { return srcProjectName; } } -} \ No newline at end of file +} diff --git a/cube-migration/src/main/java/org/apache/kylin/rest/service/MigrationService.java b/cube-migration/src/main/java/org/apache/kylin/rest/service/MigrationService.java index 67fce7e..31c4ed4 100644 --- a/cube-migration/src/main/java/org/apache/kylin/rest/service/MigrationService.java +++ b/cube-migration/src/main/java/org/apache/kylin/rest/service/MigrationService.java @@ -101,7 +101,7 @@ public class MigrationService extends BasicService { String projectName = ctx.getTgtProjectName(); try { sendApprovedMailQuietly(cubeName, projectName); - + logger.info("migration approved, cube {}, project {}", cubeName, projectName); // do cube migration new CubeMigrationCLI().moveCube(localHost, ctx.getTargetAddress(), cubeName, projectName, "true", "false", "true", "true", "false"); diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/BasicService.java b/server-base/src/main/java/org/apache/kylin/rest/service/BasicService.java index 9ac2602..b5a805b 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/service/BasicService.java +++ b/server-base/src/main/java/org/apache/kylin/rest/service/BasicService.java @@ -34,6 +34,7 @@ import org.apache.kylin.metadata.streaming.StreamingManager; import org.apache.kylin.metrics.MetricsManager; import org.apache.kylin.source.kafka.KafkaConfigManager; import org.apache.kylin.storage.hybrid.HybridManager; +import org.apache.kylin.stream.core.source.StreamingSourceConfigManager; public abstract class BasicService { @@ -63,6 +64,10 @@ public abstract class BasicService { return StreamingManager.getInstance(getConfig()); } + public StreamingSourceConfigManager getStreamingSourceConfigManager() { + return StreamingSourceConfigManager.getInstance(getConfig()); + } + public KafkaConfigManager getKafkaManager() throws IOException { return KafkaConfigManager.getInstance(getConfig()); } diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/StreamingV2Service.java b/server-base/src/main/java/org/apache/kylin/rest/service/StreamingV2Service.java index 7cb85c6..e66eee9 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/service/StreamingV2Service.java +++ b/server-base/src/main/java/org/apache/kylin/rest/service/StreamingV2Service.java @@ -23,7 +23,9 @@ import java.net.InetAddress; import java.util.Collections; import java.util.HashSet; import java.util.List; +import java.util.Locale; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; @@ -105,6 +107,26 @@ public class StreamingV2Service extends BasicService { receiverAdminClient = adminClient; } + public void checkStreamingSourceCompatibility(final String prj, final StreamingSourceConfig streamingSourceConfig) throws Exception { + StreamingSourceConfig existing = getStreamingManagerV2(). + getConfig(streamingSourceConfig.getName(), streamingSourceConfig.getProjectName()); + if (existing == null) { + return; + } else if (!Objects.equals(streamingSourceConfig.getParserInfo(), existing.getParserInfo())) { + logger.info("stream source parse info compatibility check, source {}, target {}", + streamingSourceConfig.getParserInfo(), existing.getParserInfo()); + throw new Exception(String.format(Locale.ROOT, + "the stream source parse info is not compatible, name %s, project %s", + streamingSourceConfig.getName(), streamingSourceConfig.getProjectName())); + } else if (!Objects.equals(streamingSourceConfig.getProperties(), existing.getProperties())) { + logger.info("stream source properties compatibility check, source {}, target {}", + streamingSourceConfig.getProperties(), existing.getProperties()); + throw new Exception(String.format(Locale.ROOT, + "the stream source properties are not compatible, name %s, project %s", + streamingSourceConfig.getName(), streamingSourceConfig.getProjectName())); + } + } + public List<StreamingSourceConfig> listAllStreamingConfigs(final String table, final String projectName) throws IOException { List<StreamingSourceConfig> streamingSourceConfigs = Lists.newArrayList(); if (StringUtils.isEmpty(table) || StringUtils.isEmpty(projectName)) { diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/TableSchemaUpdateChecker.java b/server-base/src/main/java/org/apache/kylin/rest/service/TableSchemaUpdateChecker.java index 3a45f49..a1318c2 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/service/TableSchemaUpdateChecker.java +++ b/server-base/src/main/java/org/apache/kylin/rest/service/TableSchemaUpdateChecker.java @@ -45,11 +45,13 @@ import org.apache.kylin.shaded.com.google.common.collect.ImmutableList; import org.apache.kylin.shaded.com.google.common.collect.Iterables; import org.apache.kylin.shaded.com.google.common.collect.Lists; import org.apache.kylin.shaded.com.google.common.collect.Sets; +import org.apache.kylin.stream.core.source.StreamingSourceConfigManager; public class TableSchemaUpdateChecker { private final TableMetadataManager metadataManager; private final CubeManager cubeManager; private final DataModelManager dataModelManager; + private StreamingSourceConfigManager streamingSourceConfigManager; public static class CheckResult { private final boolean valid; @@ -75,6 +77,11 @@ public class TableSchemaUpdateChecker { format(Locale.ROOT, "Table '%s' is compatible with all existing cubes", tableName)); } + static CheckResult validOnStreamTableDescCompatible(String tablename) { + return new CheckResult(true, + format(Locale.ROOT, "Stream table '%s' is compatible with existing stream table", tablename)); + } + static CheckResult invalidOnFetchSchema(String tableName, Exception e) { return new CheckResult(false, format(Locale.ROOT, "Failed to fetch metadata of '%s': %s", tableName, e.getMessage())); @@ -91,12 +98,22 @@ public class TableSchemaUpdateChecker { "Found %d issue(s) with '%s':%n%s Please disable and " + "purge related " + "cube(s) first", reasons.size(), tableName, buf.toString())); } + + static CheckResult invalidOnStreamTableCompatible(String tablename, String reason) { + return new CheckResult( + false, + format(Locale.ROOT, "Stream table is incompatible, the reason is %s", reason)); + } } - TableSchemaUpdateChecker(TableMetadataManager metadataManager, CubeManager cubeManager, DataModelManager dataModelManager) { + TableSchemaUpdateChecker( + TableMetadataManager metadataManager, CubeManager cubeManager, + DataModelManager dataModelManager, StreamingSourceConfigManager streamingSourceConfigManager) { this.metadataManager = checkNotNull(metadataManager, "metadataManager is null"); this.cubeManager = checkNotNull(cubeManager, "cubeManager is null"); this.dataModelManager = checkNotNull(dataModelManager, "dataModelManager is null"); + this.streamingSourceConfigManager = + checkNotNull(streamingSourceConfigManager, "streamingSourceConfigManager is null"); } private List<CubeInstance> findCubeByTable(final TableDesc table) { @@ -186,6 +203,25 @@ public class TableSchemaUpdateChecker { return true; } + public CheckResult streamTableCheckCompatibility(TableDesc newTableDesc, String prj) { + final String fullTableName = newTableDesc.getIdentity(); + TableDesc existing = metadataManager.getTableDesc(fullTableName, prj); + // check the table desc + // 1. check source type + // 2. check all columns + if (existing == null) { + return CheckResult.validOnFirstLoad(fullTableName); + } else if (newTableDesc.getSourceType() != existing.getSourceType()) { + String reason = format(Locale.ROOT, "the source type is %s, the target type is %s", + newTableDesc.getSourceType(), existing.getSourceType()); + return CheckResult.invalidOnStreamTableCompatible(fullTableName, reason); + } else if (!checkAllColumnsInTableDesc(newTableDesc, existing)) { + String reason = "the columns are incompatible"; + return CheckResult.invalidOnStreamTableCompatible(fullTableName, reason); + } + return CheckResult.validOnStreamTableDescCompatible(fullTableName); + } + public CheckResult allowReload(TableDesc newTableDesc, String prj) { final String fullTableName = newTableDesc.getIdentity(); diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java b/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java index 2d76acc..9d20766 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java +++ b/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java @@ -124,7 +124,12 @@ public class TableService extends BasicService { private AclEvaluate aclEvaluate; public TableSchemaUpdateChecker getSchemaUpdateChecker() { - return new TableSchemaUpdateChecker(getTableManager(), getCubeManager(), getDataModelManager()); + return new TableSchemaUpdateChecker(getTableManager(), getCubeManager(), getDataModelManager(), getStreamingSourceConfigManager()); + } + + public void checkStreamTableCompatibility(String project, TableDesc tableDesc) { + TableSchemaUpdateChecker.CheckResult result = getSchemaUpdateChecker().streamTableCheckCompatibility(tableDesc, project); + result.raiseExceptionWhenInvalid(); } public void checkTableCompatibility(String prj, TableDesc tableDesc) { @@ -201,7 +206,10 @@ public class TableService extends BasicService { // do schema check TableMetadataManager metaMgr = getTableManager(); CubeManager cubeMgr = getCubeManager(); - TableSchemaUpdateChecker checker = new TableSchemaUpdateChecker(metaMgr, cubeMgr, getDataModelManager()); + TableSchemaUpdateChecker checker = new TableSchemaUpdateChecker(metaMgr, + cubeMgr, + getDataModelManager(), + getStreamingSourceConfigManager()); for (Pair<TableDesc, TableExtDesc> pair : allMeta) { TableDesc tableDesc = pair.getFirst(); TableSchemaUpdateChecker.CheckResult result = checker.allowReload(tableDesc, project); diff --git a/server/src/main/resources/kylinSecurity.xml b/server/src/main/resources/kylinSecurity.xml index 36376de..2fc1b83 100644 --- a/server/src/main/resources/kylinSecurity.xml +++ b/server/src/main/resources/kylinSecurity.xml @@ -238,6 +238,7 @@ <scr:intercept-url pattern="/api/user/authentication*/**" access="permitAll"/> <scr:intercept-url pattern="/api/cubes/checkCompatibility" access="permitAll"/> <scr:intercept-url pattern="/api/cubes/checkCompatibility/hiveTable" access="permitAll"/> + <scr:intercept-url pattern="/api/cubes/checkStreamTableCompatibility" access="permitAll"/> <scr:intercept-url pattern="/api/query/runningQueries" access="hasRole('ROLE_ADMIN')"/> <scr:intercept-url pattern="/api/query/*/stop" access="hasRole('ROLE_ADMIN')"/> <scr:intercept-url pattern="/api/query*/**" access="isAuthenticated()"/> @@ -291,6 +292,7 @@ <scr:intercept-url pattern="/api/user/authentication*/**" access="permitAll"/> <scr:intercept-url pattern="/api/cubes/checkCompatibility" access="permitAll"/> <scr:intercept-url pattern="/api/cubes/checkCompatibility/hiveTable" access="permitAll"/> + <scr:intercept-url pattern="/api/cubes/checkStreamTableCompatibility" access="permitAll"/> <scr:intercept-url pattern="/api/query/runningQueries" access="hasRole('ROLE_ADMIN')"/> <scr:intercept-url pattern="/api/query/*/stop" access="hasRole('ROLE_ADMIN')"/> <scr:intercept-url pattern="/api/query*/**" access="isAuthenticated()"/> diff --git a/stream-core/src/main/java/org/apache/kylin/stream/core/source/MessageParserInfo.java b/stream-core/src/main/java/org/apache/kylin/stream/core/source/MessageParserInfo.java index 4070ae6..fa21ed1 100644 --- a/stream-core/src/main/java/org/apache/kylin/stream/core/source/MessageParserInfo.java +++ b/stream-core/src/main/java/org/apache/kylin/stream/core/source/MessageParserInfo.java @@ -19,6 +19,7 @@ package org.apache.kylin.stream.core.source; import java.util.Map; +import java.util.Objects; import com.fasterxml.jackson.annotation.JsonAutoDetect; import com.fasterxml.jackson.annotation.JsonProperty; @@ -80,4 +81,20 @@ public class MessageParserInfo { this.columnToSourceFieldMapping = columnToSourceFieldMapping; } + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + MessageParserInfo that = (MessageParserInfo) o; + return formatTs == that.formatTs && + Objects.equals(tsColName, that.tsColName) && + Objects.equals(tsParser, that.tsParser) && + Objects.equals(tsPattern, that.tsPattern) && + Objects.equals(columnToSourceFieldMapping, that.columnToSourceFieldMapping); + } + + @Override + public int hashCode() { + return Objects.hash(tsColName, tsParser, tsPattern, formatTs, columnToSourceFieldMapping); + } } diff --git a/tool/src/main/java/org/apache/kylin/tool/CubeMigrationCLI.java b/tool/src/main/java/org/apache/kylin/tool/CubeMigrationCLI.java index 0c3d6e9..0295bea 100644 --- a/tool/src/main/java/org/apache/kylin/tool/CubeMigrationCLI.java +++ b/tool/src/main/java/org/apache/kylin/tool/CubeMigrationCLI.java @@ -172,7 +172,14 @@ public class CubeMigrationCLI extends AbstractApplication { CubeManager cubeManager = CubeManager.getInstance(srcConfig); CubeInstance cube = cubeManager.getCube(cubeName); - logger.info("cube to be moved is : " + cubeName); + logger.info("cube to be moved is {}, project is {}", cubeName, projectName); + if (cube.getDescriptor().getModel().getRootFactTable().getTableDesc().isStreamingTable()) { + logger.info("move streaming cube, project: {}, cube name {}", projectName, cubeName); + if (migrateSegment) { + throw new InterruptedException("Can't migrate stream cube with data"); + } + } + logger.info("cube to be moved is {}, project is {}, the real execute is {}", cubeName, projectName, realExecute); if (migrateSegment) { checkCubeState(cube); @@ -205,6 +212,7 @@ public class CubeMigrationCLI extends AbstractApplication { updateMeta(dstConfig, projectName, cubeName, cube.getModel()); updateMeta(srcConfig, cube.getProject(), cubeName, cube.getModel()); } else { + logger.info("show operations for cube {}, project {}", cubeName, cube.getProject()); showOpts(); } } @@ -359,6 +367,8 @@ public class CubeMigrationCLI extends AbstractApplication { metaResource.add(ACL_PREFIX + cube.getModel().getUuid()); } + // if the cube is a stream cube, and add the stream source config + // streaming cube just support one fact table if (cubeDesc.isStreamingCube()) { // add streaming source config info for streaming cube String tableName = cubeDesc.getModel().getRootFactTableName(); @@ -694,6 +704,7 @@ public class CubeMigrationCLI extends AbstractApplication { return srcItem; } + private void updateMeta(KylinConfig config, String projectName, String cubeName, DataModelDesc model) { String[] nodes = config.getRawRestServers(); Map<String, String> tableToProjects = new HashMap<>(); diff --git a/tool/src/main/java/org/apache/kylin/tool/migration/StreamTableCompatibilityCheckRequest.java b/tool/src/main/java/org/apache/kylin/tool/migration/StreamTableCompatibilityCheckRequest.java new file mode 100644 index 0000000..0262bb7 --- /dev/null +++ b/tool/src/main/java/org/apache/kylin/tool/migration/StreamTableCompatibilityCheckRequest.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.tool.migration; + +public class StreamTableCompatibilityCheckRequest { + private String projectName; + private String tableDesc; + private String streamSource; + + public void setProjectName(String projectName) { + this.projectName = projectName; + } + + public void setTableDesc(String tableDesc) { + this.tableDesc = tableDesc; + } + + public void setStreamSource(String streamSource) { + this.streamSource = streamSource; + } + + public String getProjectName() { + return projectName; + } + + public String getTableDesc() { + return tableDesc; + } + + public String getStreamSource() { + return streamSource; + } +}