This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/master by this push: new decb4ab KYLIN-4921: New rowkey for streamv2 source config and fix bug that can't create same name table in diff project (#1604) decb4ab is described below commit decb4ab7ed3fa0d8fbf0bfd7544676d139ff8948 Author: Kun Liu <liu...@apache.org> AuthorDate: Thu Mar 18 16:51:08 2021 +0800 KYLIN-4921: New rowkey for streamv2 source config and fix bug that can't create same name table in diff project (#1604) * support diff project with same stream name * fix bug: when create duplicated table, the original stream table can't be deleted --- .../kylin/realtime/BuildCubeWithStreamV2.java | 3 +- .../rest/controller/StreamingV2Controller.java | 35 +++++++-- .../kylin/rest/controller/TableController.java | 2 +- .../apache/kylin/rest/service/ProjectService.java | 2 +- .../kylin/rest/service/StreamingV2Service.java | 12 +-- .../apache/kylin/rest/service/TableService.java | 7 +- .../stream/core/source/StreamingSourceConfig.java | 32 +++++++- .../core/source/StreamingSourceConfigManager.java | 88 ++++++++++++++++------ .../kylin/stream/server/StreamingServer.java | 2 +- .../source/kafka/KafkaBatchSourceAdaptor.java | 2 +- .../kylin/stream/source/kafka/KafkaSource.java | 8 +- .../org/apache/kylin/tool/CubeMigrationCLI.java | 15 +++- .../kylin/tool/extractor/CubeMetaExtractor.java | 17 +---- webapp/app/js/controllers/sourceMeta.js | 7 +- 14 files changed, 168 insertions(+), 64 deletions(-) diff --git a/kylin-it/src/test/java/org/apache/kylin/realtime/BuildCubeWithStreamV2.java b/kylin-it/src/test/java/org/apache/kylin/realtime/BuildCubeWithStreamV2.java index fb85b26..7f2c446 100644 --- a/kylin-it/src/test/java/org/apache/kylin/realtime/BuildCubeWithStreamV2.java +++ b/kylin-it/src/test/java/org/apache/kylin/realtime/BuildCubeWithStreamV2.java @@ -160,7 +160,8 @@ public class BuildCubeWithStreamV2 extends KylinTestBase { final CubeInstance cubeInstance = CubeManager.getInstance(kylinConfig).getCube(CUBE_NAME); final String streamingTableName = cubeInstance.getRootFactTable(); - final StreamingSourceConfig sourceConfig = StreamingSourceConfigManager.getInstance(kylinConfig).getConfig(streamingTableName); + final String projectName = cubeInstance.getProject(); + final StreamingSourceConfig sourceConfig = StreamingSourceConfigManager.getInstance(kylinConfig).getConfig(streamingTableName, projectName); topicName = KafkaSource.getTopicName(sourceConfig.getProperties()); String bootstrapServers = KafkaSource.getBootstrapServers(sourceConfig.getProperties()); diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller/StreamingV2Controller.java b/server-base/src/main/java/org/apache/kylin/rest/controller/StreamingV2Controller.java index 0a0cc62..a73a88c 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/controller/StreamingV2Controller.java +++ b/server-base/src/main/java/org/apache/kylin/rest/controller/StreamingV2Controller.java @@ -26,6 +26,7 @@ import java.util.Set; import java.util.UUID; import org.apache.commons.lang.StringUtils; +import org.apache.directory.api.util.Strings; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; import org.apache.hadoop.hive.metastore.api.FieldSchema; @@ -104,10 +105,21 @@ public class StreamingV2Controller extends BasicController { @RequestMapping(value = "/getConfig", method = { RequestMethod.GET }) @ResponseBody public List<StreamingSourceConfig> getStreamings(@RequestParam(value = "table", required = false) String table, + @RequestParam(value = "project", required = false) String project, @RequestParam(value = "limit", required = false) Integer limit, @RequestParam(value = "offset", required = false) Integer offset) { try { - return streamingService.getStreamingConfigs(table, limit, offset); + // query all streaming config or query one streaming config + if (!Strings.isEmpty(table) && !Strings.isEmpty(project)) { + // check the table metadata + if (tableService.getTableDescByName(table, false, project) == null) { + // the table metadata doesn't exist + throw new InternalErrorException(String.format(Locale.ROOT, + "The table %s of project %s doesn't exist, please make the stream table exists", + table, project)); + } + } + return streamingService.getStreamingConfigs(table, project, limit, offset); } catch (IOException e) { logger.error("Failed to deal with the request:" + e.getLocalizedMessage(), e); throw new InternalErrorException("Failed to deal with the request: " + e.getLocalizedMessage()); @@ -140,10 +152,15 @@ public class StreamingV2Controller extends BasicController { try { try { tableDesc.setUuid(UUID.randomUUID().toString()); + if (tableService.getTableDescByName(tableDesc.getIdentity(), false, project) != null) { + throw new IOException(String.format(Locale.ROOT, + "The table %s of project %s exists", + tableDesc.getIdentity(), project)); + } tableService.loadTableToProject(tableDesc, null, project); saveTableSuccess = true; } catch (IOException e) { - throw new BadRequestException("Failed to add streaming table."); + throw new BadRequestException("Failed to add streaming table, because of " + e.getMessage()); } try { streamingSourceConfig.setName(tableDesc.getIdentity()); @@ -159,7 +176,8 @@ public class StreamingV2Controller extends BasicController { if (!saveTableSuccess || !saveStreamingSuccess) { if (saveTableSuccess) { try { - tableService.unloadHiveTable(tableDesc.getIdentity(), project); + // just drop the table metadata and don't drop the stream source config info + tableService.unloadHiveTable(tableDesc.getIdentity(), project, false); } catch (IOException e) { shouldThrow = new InternalErrorException( "Action failed and failed to rollback the create table " + e.getLocalizedMessage(), e); @@ -280,7 +298,7 @@ public class StreamingV2Controller extends BasicController { final String user = SecurityContextHolder.getContext().getAuthentication().getName(); logger.info("{} try to updateStreamingConfig.", user); try { - streamingSourceConfig = streamingService.updateStreamingConfig(streamingSourceConfig); + streamingService.updateStreamingConfig(streamingSourceConfig); } catch (AccessDeniedException accessDeniedException) { throw new ForbiddenException("You don't have right to update this StreamingSourceConfig."); } catch (Exception e) { @@ -292,10 +310,13 @@ public class StreamingV2Controller extends BasicController { return streamingRequest; } - @RequestMapping(value = "/{configName}", method = { RequestMethod.DELETE }) + @Deprecated + @RequestMapping(value = "/{project}/{configName}", method = { RequestMethod.DELETE }, produces = { + "application/json" }) @ResponseBody - public void deleteConfig(@PathVariable String configName) throws IOException { - StreamingSourceConfig config = streamingService.getStreamingManagerV2().getConfig(configName); + public void deleteConfig(@PathVariable String project, @PathVariable String configName) throws IOException { + // This method will never be called by the frontend. + StreamingSourceConfig config = streamingService.getStreamingManagerV2().getConfig(configName, project); if (null == config) { throw new NotFoundException("StreamingSourceConfig with name " + configName + " not found.."); } diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller/TableController.java b/server-base/src/main/java/org/apache/kylin/rest/controller/TableController.java index 6c529b9..2af8f54 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/controller/TableController.java +++ b/server-base/src/main/java/org/apache/kylin/rest/controller/TableController.java @@ -159,7 +159,7 @@ public class TableController extends BasicController { try { for (String tableName : StringUtil.splitByComma(tables)) { tableACLService.deleteFromTableACLByTbl(project, tableName); - if (tableService.unloadHiveTable(tableName, project)) { + if (tableService.unloadHiveTable(tableName, project, true)) { unLoadSuccess.add(tableName); } else { unLoadFail.add(tableName); diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/ProjectService.java b/server-base/src/main/java/org/apache/kylin/rest/service/ProjectService.java index 40805a9..c06d56f 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/service/ProjectService.java +++ b/server-base/src/main/java/org/apache/kylin/rest/service/ProjectService.java @@ -152,7 +152,7 @@ public class ProjectService extends BasicService { public void deleteProject(String projectName, ProjectInstance project) throws IOException { Set<String> tables = project.getTables(); for (String table : Sets.newTreeSet(tables)) { - tableService.unloadHiveTable(table, projectName); + tableService.unloadHiveTable(table, projectName, true); getTableManager().removeTableExt(table, projectName); getTableACLManager().deleteTableACLByTbl(projectName, table); } diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/StreamingV2Service.java b/server-base/src/main/java/org/apache/kylin/rest/service/StreamingV2Service.java index c43d625..3ef0ee2 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/service/StreamingV2Service.java +++ b/server-base/src/main/java/org/apache/kylin/rest/service/StreamingV2Service.java @@ -105,12 +105,12 @@ public class StreamingV2Service extends BasicService { receiverAdminClient = adminClient; } - public List<StreamingSourceConfig> listAllStreamingConfigs(final String table) throws IOException { + public List<StreamingSourceConfig> listAllStreamingConfigs(final String table, final String projectName) throws IOException { List<StreamingSourceConfig> streamingSourceConfigs = Lists.newArrayList(); - if (StringUtils.isEmpty(table)) { + if (StringUtils.isEmpty(table) || StringUtils.isEmpty(projectName)) { streamingSourceConfigs = getStreamingManagerV2().listAllStreaming(); } else { - StreamingSourceConfig config = getStreamingManagerV2().getConfig(table); + StreamingSourceConfig config = getStreamingManagerV2().getConfig(table, projectName); if (config != null) { streamingSourceConfigs.add(config); } @@ -119,10 +119,10 @@ public class StreamingV2Service extends BasicService { return streamingSourceConfigs; } - public List<StreamingSourceConfig> getStreamingConfigs(final String table, final Integer limit, final Integer offset) + public List<StreamingSourceConfig> getStreamingConfigs(final String table, final String projectName, final Integer limit, final Integer offset) throws IOException { List<StreamingSourceConfig> streamingSourceConfigs; - streamingSourceConfigs = listAllStreamingConfigs(table); + streamingSourceConfigs = listAllStreamingConfigs(table, projectName); if (limit == null || offset == null) { return streamingSourceConfigs; @@ -138,7 +138,7 @@ public class StreamingV2Service extends BasicService { @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN + " or hasPermission(#project, 'ADMINISTRATION')") public StreamingSourceConfig createStreamingConfig(StreamingSourceConfig config, ProjectInstance project) throws IOException { - if (getStreamingManagerV2().getConfig(config.getName()) != null) { + if (getStreamingManagerV2().getConfigMustWithProject(config.getName(), config.getProjectName()) != null) { throw new InternalErrorException("The streamingSourceConfig named " + config.getName() + " already exists"); } StreamingSourceConfig streamingSourceConfig = getStreamingManagerV2().saveStreamingConfig(config); diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java b/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java index 069a460..2d76acc 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java +++ b/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java @@ -284,9 +284,10 @@ public class TableService extends BasicService { * that's why we have two if statement here. * @param tableName * @param project + * @param needRemoveStreamInfo * @return */ - public boolean unloadHiveTable(String tableName, String project) throws IOException { + public boolean unloadHiveTable(String tableName, String project, boolean needRemoveStreamInfo) throws IOException { aclEvaluate.checkProjectAdminPermission(project); Message msg = MsgPicker.getMsg(); @@ -319,7 +320,9 @@ public class TableService extends BasicService { // remove streaming info SourceManager sourceManager = SourceManager.getInstance(KylinConfig.getInstanceFromEnv()); ISource source = sourceManager.getCachedSource(desc); - source.unloadTable(tableName, project); + if (!desc.isStreamingTable() || needRemoveStreamInfo) { + source.unloadTable(tableName, project); + } return rtn; } diff --git a/stream-core/src/main/java/org/apache/kylin/stream/core/source/StreamingSourceConfig.java b/stream-core/src/main/java/org/apache/kylin/stream/core/source/StreamingSourceConfig.java index 3147a59..9bb4412 100644 --- a/stream-core/src/main/java/org/apache/kylin/stream/core/source/StreamingSourceConfig.java +++ b/stream-core/src/main/java/org/apache/kylin/stream/core/source/StreamingSourceConfig.java @@ -25,6 +25,7 @@ import java.io.DataOutputStream; import java.io.IOException; import java.util.Map; +import org.apache.directory.api.util.Strings; import org.apache.kylin.common.persistence.JsonSerializer; import org.apache.kylin.common.persistence.ResourceStore; import org.apache.kylin.common.persistence.RootPersistentEntity; @@ -32,6 +33,7 @@ import org.apache.kylin.common.persistence.Serializer; import com.fasterxml.jackson.annotation.JsonAutoDetect; import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.kylin.metadata.MetadataConstants; import org.apache.kylin.shaded.com.google.common.collect.Maps; /** @@ -53,10 +55,28 @@ public class StreamingSourceConfig extends RootPersistentEntity { @JsonProperty("properties") private Map<String, String> properties = Maps.newLinkedHashMap(); - public static String concatResourcePath(String name) { - return ResourceStore.STREAMING_V2_RESOURCE_ROOT + "/" + name + ".json"; + @JsonProperty("project_name") + private String projectName; + + @Deprecated + static String concatResourcePath(String name) { + return ResourceStore.STREAMING_V2_RESOURCE_ROOT + "/" + name + MetadataConstants.FILE_SURFIX; + } + + public static String concatResourcePathWithProjName(String name, String projectName) { + if (Strings.isEmpty(projectName)) { + return concatResourcePath(name); + } else { + // like table desc + return ResourceStore.STREAMING_V2_RESOURCE_ROOT + "/" + name + "--" + projectName + MetadataConstants.FILE_SURFIX; + } + } + + public String getResourcePathWithProjName() { + return concatResourcePathWithProjName(name, projectName); } + @Deprecated public String getResourcePath() { return concatResourcePath(name); } @@ -85,6 +105,14 @@ public class StreamingSourceConfig extends RootPersistentEntity { this.parserInfo = parserInfo; } + public void setProjectName(String projectName) { + this.projectName = projectName; + } + + public String getProjectName() { + return projectName; + } + @Override public StreamingSourceConfig clone() { try { diff --git a/stream-core/src/main/java/org/apache/kylin/stream/core/source/StreamingSourceConfigManager.java b/stream-core/src/main/java/org/apache/kylin/stream/core/source/StreamingSourceConfigManager.java index 7a0d62b..e619003 100644 --- a/stream-core/src/main/java/org/apache/kylin/stream/core/source/StreamingSourceConfigManager.java +++ b/stream-core/src/main/java/org/apache/kylin/stream/core/source/StreamingSourceConfigManager.java @@ -24,6 +24,7 @@ import java.util.Locale; import java.util.concurrent.ConcurrentHashMap; import org.apache.commons.lang3.StringUtils; +import org.apache.directory.api.util.Strings; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.persistence.JsonSerializer; import org.apache.kylin.common.persistence.ResourceStore; @@ -83,7 +84,6 @@ public class StreamingSourceConfigManager { ResourceStore store = getStore(); logger.info("Load all streaming metadata from folder " + store.getReadableResourcePath(ResourceStore.STREAMING_V2_RESOURCE_ROOT)); - List<String> paths = store.collectResourceRecursively(ResourceStore.STREAMING_V2_RESOURCE_ROOT, MetadataConstants.FILE_SURFIX); for (String path : paths) { @@ -94,7 +94,10 @@ public class StreamingSourceConfigManager { logger.error("Error loading streaming desc " + path, e); continue; } - if (path.equals(streamingSourceConfig.getResourcePath()) == false) { + // check path without project name + // check path with project name + if (path.equals(streamingSourceConfig.getResourcePath()) == false && + path.equals(streamingSourceConfig.getResourcePathWithProjName()) == false) { logger.error("Skip suspicious desc at " + path + ", " + streamingSourceConfig + " should be at " + streamingSourceConfig.getResourcePath()); continue; @@ -113,26 +116,62 @@ public class StreamingSourceConfigManager { * @param name * @throws IOException */ - public StreamingSourceConfig reloadStreamingConfigLocal(String name) throws IOException { - - // Save Source - String path = StreamingSourceConfig.concatResourcePath(name); - + public StreamingSourceConfig reloadStreamingConfigLocal(String name, String projectName) throws IOException { + if (Strings.isEmpty(name) || Strings.isEmpty(projectName)) { + throw new StreamingException(String.format(Locale.ROOT, + "the table name %s or project name %s is null", name, projectName)); + } + // path with project name + String path = StreamingSourceConfig.concatResourcePathWithProjName(name, projectName); // Reload the StreamingSourceConfig - StreamingSourceConfig ndesc = loadStreamingConfigAt(path); - return ndesc; + StreamingSourceConfig config = loadStreamingConfigAt(path); + if (config == null) { + // the path with project name doesn't contain the source config, and check the old path without project name. + path = StreamingSourceConfig.concatResourcePath(name); + config = loadStreamingConfigAt(path); + if (config != null) { + config.setProjectName(projectName); + // remove from the old path, and save the source config to the new path + removeStreamingConfig(config); + saveStreamingConfig(config); + } + } + return config; } // remove streamingSourceConfig public void removeStreamingConfig(StreamingSourceConfig streamingSourceConfig) throws IOException { - String path = streamingSourceConfig.getResourcePath(); - getStore().deleteResource(path); + // path with project name + String path = streamingSourceConfig.getResourcePathWithProjName(); + if (loadStreamingConfigAt(path) != null) { + getStore().deleteResource(path); + } else { + // The source is stored in the old path which is prefix + table name + suffix + path = streamingSourceConfig.getResourcePath(); + getStore().deleteResource(path); + } + } + + public StreamingSourceConfig getConfig(String name, String projectName) { + name = name.toUpperCase(Locale.ROOT); + try { + return reloadStreamingConfigLocal(name, projectName); + } catch (IOException e) { + throw new StreamingException(e); + } } - public StreamingSourceConfig getConfig(String name) { + public StreamingSourceConfig getConfigMustWithProject(String name, String projectName) { name = name.toUpperCase(Locale.ROOT); + if (Strings.isEmpty(name) || Strings.isEmpty(projectName)) { + throw new StreamingException(String.format(Locale.ROOT, + "the table name %s or project name %s is null", name, projectName)); + } + // path with project name + String path = StreamingSourceConfig.concatResourcePathWithProjName(name, projectName); + // Reload the StreamingSourceConfig try { - return reloadStreamingConfigLocal(name); + return loadStreamingConfigAt(path); } catch (IOException e) { throw new StreamingException(e); } @@ -140,32 +179,35 @@ public class StreamingSourceConfigManager { /** * - * @param desc + * @param streamingSourceConfig * @return * @throws IOException */ - public StreamingSourceConfig updateStreamingConfig(StreamingSourceConfig desc) throws IOException { + public StreamingSourceConfig updateStreamingConfig(StreamingSourceConfig streamingSourceConfig) throws IOException { // Validate CubeDesc - if (desc.getUuid() == null || desc.getName() == null) { + if (streamingSourceConfig.getUuid() == null || streamingSourceConfig.getName() == null) { throw new IllegalArgumentException("SteamingConfig Illegal."); } - // Save Source - String path = desc.getResourcePath(); - getStore().putResource(path, desc, System.currentTimeMillis(), STREAMING_SERIALIZER); + // remove Source + removeStreamingConfig(streamingSourceConfig); + + // Save Source, the path with project name + String path = streamingSourceConfig.getResourcePathWithProjName(); + getStore().putResource(path, streamingSourceConfig, System.currentTimeMillis(), STREAMING_SERIALIZER); // Reload the StreamingSourceConfig - StreamingSourceConfig ndesc = loadStreamingConfigAt(path); + StreamingSourceConfig newStreamingSourceConfig = loadStreamingConfigAt(path); - return ndesc; + return newStreamingSourceConfig; } public StreamingSourceConfig saveStreamingConfig(StreamingSourceConfig streamingSourceConfig) throws IOException { if (streamingSourceConfig == null || StringUtils.isEmpty(streamingSourceConfig.getName())) { throw new IllegalArgumentException(); } - - String path = StreamingSourceConfig.concatResourcePath(streamingSourceConfig.getName()); + // path = prefix + /table name---project name + suffix + String path = streamingSourceConfig.getResourcePathWithProjName(); getStore().putResource(path, streamingSourceConfig, System.currentTimeMillis(), StreamingSourceConfig.SERIALIZER); return streamingSourceConfig; diff --git a/stream-receiver/src/main/java/org/apache/kylin/stream/server/StreamingServer.java b/stream-receiver/src/main/java/org/apache/kylin/stream/server/StreamingServer.java index 7ace4ec..d09d428 100644 --- a/stream-receiver/src/main/java/org/apache/kylin/stream/server/StreamingServer.java +++ b/stream-receiver/src/main/java/org/apache/kylin/stream/server/StreamingServer.java @@ -703,7 +703,7 @@ public class StreamingServer implements ReplicaSetLeaderSelector.LeaderChangeLis CubeDescManager.getInstance(kylinConfig).reloadCubeDescLocal(cubeName); CubeInstance cubeInstance = CubeManager.getInstance(kylinConfig).reloadCubeQuietly(cubeName); StreamingSourceConfigManager.getInstance(kylinConfig).reloadStreamingConfigLocal( - cubeInstance.getRootFactTable()); + cubeInstance.getRootFactTable(), cubeInstance.getProject()); } private String calLocalSegmentCacheDir() { diff --git a/stream-source-kafka/src/main/java/org/apache/kylin/stream/source/kafka/KafkaBatchSourceAdaptor.java b/stream-source-kafka/src/main/java/org/apache/kylin/stream/source/kafka/KafkaBatchSourceAdaptor.java index 2dc2a9c..4173a5c 100644 --- a/stream-source-kafka/src/main/java/org/apache/kylin/stream/source/kafka/KafkaBatchSourceAdaptor.java +++ b/stream-source-kafka/src/main/java/org/apache/kylin/stream/source/kafka/KafkaBatchSourceAdaptor.java @@ -34,7 +34,7 @@ public class KafkaBatchSourceAdaptor extends HiveSource { public void unloadTable(String tableName, String project) throws IOException { KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); StreamingSourceConfigManager sourceConfigManager = StreamingSourceConfigManager.getInstance(kylinConfig); - StreamingSourceConfig config = sourceConfigManager.getConfig(tableName); + StreamingSourceConfig config = sourceConfigManager.getConfig(tableName, project); if (config == null) { return; } diff --git a/stream-source-kafka/src/main/java/org/apache/kylin/stream/source/kafka/KafkaSource.java b/stream-source-kafka/src/main/java/org/apache/kylin/stream/source/kafka/KafkaSource.java index 7db1961..e2ab584 100644 --- a/stream-source-kafka/src/main/java/org/apache/kylin/stream/source/kafka/KafkaSource.java +++ b/stream-source-kafka/src/main/java/org/apache/kylin/stream/source/kafka/KafkaSource.java @@ -76,8 +76,9 @@ public class KafkaSource implements IStreamingSource { KylinConfig kylinConf = KylinConfig.getInstanceFromEnv(); CubeInstance cube = CubeManager.getInstance(kylinConf).getCube(cubeName); String streamingTableName = cube.getRootFactTable(); + String projectName = cube.getProject(); StreamingSourceConfig streamingSourceConfig = StreamingSourceConfigManager.getInstance(kylinConf) - .getConfig(streamingTableName); + .getConfig(streamingTableName, projectName); String topicName = getTopicName(streamingSourceConfig.getProperties()); Map<String, Object> conf = getKafkaConf(streamingSourceConfig.getProperties(), cube.getConfig()); @@ -145,8 +146,9 @@ public class KafkaSource implements IStreamingSource { CubeInstance cubeInstance = CubeManager.getInstance(kylinConf).getCube(cubeName); IStreamingSource streamingSource = StreamingSourceFactory.getStreamingSource(cubeInstance); String streamingName = cubeInstance.getRootFactTable(); + String projectName = cubeInstance.getProject(); StreamingSourceConfig streamingSourceConfig = StreamingSourceConfigManager.getInstance(kylinConf) - .getConfig(streamingName); + .getConfig(streamingName, projectName); String topic = getTopicName(streamingSourceConfig.getProperties()); Map<String, Object> conf = getKafkaConf(streamingSourceConfig.getProperties(), cubeInstance.getConfig()); @@ -295,7 +297,7 @@ public class KafkaSource implements IStreamingSource { CubeInstance cube = CubeManager.getInstance(kylinConf).getCube(cubeName); String streamingTableName = cube.getRootFactTable(); StreamingSourceConfig streamingSourceConfig = StreamingSourceConfigManager.getInstance(kylinConf) - .getConfig(streamingTableName); + .getConfig(streamingTableName, cube.getProject()); String topicName = getTopicName(streamingSourceConfig.getProperties()); ISourcePosition sourcePosition = new KafkaPosition(); Map<String, Object> conf = getKafkaConf(streamingSourceConfig.getProperties(), cube.getConfig()); diff --git a/tool/src/main/java/org/apache/kylin/tool/CubeMigrationCLI.java b/tool/src/main/java/org/apache/kylin/tool/CubeMigrationCLI.java index 033a669..0c3d6e9 100644 --- a/tool/src/main/java/org/apache/kylin/tool/CubeMigrationCLI.java +++ b/tool/src/main/java/org/apache/kylin/tool/CubeMigrationCLI.java @@ -19,6 +19,7 @@ package org.apache.kylin.tool; import java.io.IOException; +import java.io.InterruptedIOException; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; @@ -71,6 +72,7 @@ import org.apache.kylin.metadata.realization.RealizationStatusEnum; import org.apache.kylin.metadata.realization.RealizationType; import org.apache.kylin.storage.hbase.HBaseConnection; import org.apache.kylin.stream.core.source.StreamingSourceConfig; +import org.apache.kylin.stream.core.source.StreamingSourceConfigManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -359,7 +361,18 @@ public class CubeMigrationCLI extends AbstractApplication { if (cubeDesc.isStreamingCube()) { // add streaming source config info for streaming cube - metaResource.add(StreamingSourceConfig.concatResourcePath(cubeDesc.getModel().getRootFactTableName())); + String tableName = cubeDesc.getModel().getRootFactTableName(); + String projectName = cubeDesc.getProject(); + KylinConfig kylinConf = KylinConfig.getInstanceFromEnv(); + StreamingSourceConfigManager manager = StreamingSourceConfigManager.getInstance(kylinConf); + StreamingSourceConfig sourceConfig = manager.getConfig(tableName, projectName); + if (sourceConfig != null) { + metaResource.add(sourceConfig.getResourcePathWithProjName()); + } else { + throw new InterruptedIOException(String.format(Locale.ROOT, + "The stream source config doesn't exist, the table name: %s, the project name: %s", + tableName, projectName)); + } } } diff --git a/tool/src/main/java/org/apache/kylin/tool/extractor/CubeMetaExtractor.java b/tool/src/main/java/org/apache/kylin/tool/extractor/CubeMetaExtractor.java index 599e58d..893365a 100644 --- a/tool/src/main/java/org/apache/kylin/tool/extractor/CubeMetaExtractor.java +++ b/tool/src/main/java/org/apache/kylin/tool/extractor/CubeMetaExtractor.java @@ -20,7 +20,6 @@ package org.apache.kylin.tool.extractor; import java.io.File; import java.io.IOException; -import java.util.Collection; import java.util.List; import java.util.Objects; import java.util.Set; @@ -444,18 +443,10 @@ public class CubeMetaExtractor extends AbstractInfoExtractor { } private void addStreamingV2Config(CubeInstance cube) { - Collection<StreamingSourceConfig> streamingConfigs; - try { - streamingConfigs = streamingSourceConfigManager.listAllStreaming(); - } catch (IOException ioe) { - logger.error("", ioe); - return; - } - for (StreamingSourceConfig streamingConfig : streamingConfigs) { - if (streamingConfig.getName() != null - && streamingConfig.getName().equalsIgnoreCase(cube.getRootFactTable())) { - addRequired(StreamingSourceConfig.concatResourcePath(streamingConfig.getName())); - } + StreamingSourceConfig streamingSourceConfig = + streamingSourceConfigManager.getConfig(cube.getRootFactTable(), cube.getProject()); + if (streamingSourceConfig != null) { + addRequired(streamingSourceConfig.getResourcePathWithProjName()); } } diff --git a/webapp/app/js/controllers/sourceMeta.js b/webapp/app/js/controllers/sourceMeta.js index f75a1c4..3ebabbb 100755 --- a/webapp/app/js/controllers/sourceMeta.js +++ b/webapp/app/js/controllers/sourceMeta.js @@ -1105,7 +1105,8 @@ KylinApp $scope.streamingConfig = { name: '', properties: {}, - parser_info: {} + parser_info: {}, + project_name: '' }; $scope.tableData = { @@ -1399,6 +1400,7 @@ KylinApp $scope.streamingConfig.parser_info.ts_parser = $scope.streaming.TSParser; $scope.streamingConfig.parser_info.ts_pattern = $scope.streaming.TSPattern; $scope.streamingConfig.parser_info.field_mapping = {}; + $scope.streamingConfig.project_name = projectName; $scope.tableData.columns.forEach(function(col) { if (col.comment) { $scope.streamingConfig.parser_info.field_mapping[col.name] = col.comment.replace(/\|/g, '.') || '' @@ -1530,7 +1532,8 @@ KylinApp if (_.values(tableConfig.streamingSourceType).indexOf($scope.tableModel.selectedSrcTable.source_type) > -1) { var table = $scope.tableModel.selectedSrcTable; var streamingName = table.database+"."+table.name; - StreamingServiceV2.getConfig({table:streamingName}, function (configs) { + var projectName = $scope.projectModel.getSelectedProject(); + StreamingServiceV2.getConfig({table:streamingName, project: projectName}, function (configs) { $scope.currentStreamingConfig = configs[0]; }); }