KYLIN-2323 Refactor table load/unload
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/e2e2a81c Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/e2e2a81c Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/e2e2a81c Branch: refs/heads/master-hbase1.x Commit: e2e2a81c1f1d7e3f9af5c81bb4f1ad14d2d1b859 Parents: 6ea03b8 Author: Billy Liu <billy...@apache.org> Authored: Thu Dec 29 18:48:38 2016 +0800 Committer: Billy Liu <billy...@apache.org> Committed: Thu Dec 29 18:48:38 2016 +0800 ---------------------------------------------------------------------- .../hive/ITHiveSourceTableLoaderTest.java | 2 +- .../rest/controller/StreamingController.java | 22 +- .../kylin/rest/controller/TableController.java | 268 ++++------------ .../apache/kylin/rest/service/CubeService.java | 96 ------ .../apache/kylin/rest/service/TableService.java | 318 +++++++++++++++++++ .../source/hive/HiveSourceTableLoader.java | 2 +- webapp/app/js/controllers/sourceMeta.js | 16 +- webapp/app/js/services/tables.js | 1 - 8 files changed, 391 insertions(+), 334 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/e2e2a81c/kylin-it/src/test/java/org/apache/kylin/source/hive/ITHiveSourceTableLoaderTest.java ---------------------------------------------------------------------- diff --git a/kylin-it/src/test/java/org/apache/kylin/source/hive/ITHiveSourceTableLoaderTest.java b/kylin-it/src/test/java/org/apache/kylin/source/hive/ITHiveSourceTableLoaderTest.java index c4f0777..7aff3ba 100644 --- a/kylin-it/src/test/java/org/apache/kylin/source/hive/ITHiveSourceTableLoaderTest.java +++ b/kylin-it/src/test/java/org/apache/kylin/source/hive/ITHiveSourceTableLoaderTest.java @@ -45,7 +45,7 @@ public class ITHiveSourceTableLoaderTest extends HBaseMetadataTestCase { public void test() throws IOException { KylinConfig config = getTestConfig(); String[] toLoad = new String[] { "DEFAULT.TEST_KYLIN_FACT", "EDW.TEST_CAL_DT" }; - Set<String> loaded = HiveSourceTableLoader.reloadHiveTables(toLoad, config); + Set<String> loaded = HiveSourceTableLoader.loadHiveTables(toLoad, config); assertTrue(loaded.size() == toLoad.length); for (String str : toLoad) http://git-wip-us.apache.org/repos/asf/kylin/blob/e2e2a81c/server-base/src/main/java/org/apache/kylin/rest/controller/StreamingController.java ---------------------------------------------------------------------- diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller/StreamingController.java b/server-base/src/main/java/org/apache/kylin/rest/controller/StreamingController.java index e04ebc8..0ced9ad 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/controller/StreamingController.java +++ b/server-base/src/main/java/org/apache/kylin/rest/controller/StreamingController.java @@ -23,10 +23,8 @@ import java.util.List; import java.util.UUID; import org.apache.commons.lang.StringUtils; -import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.JsonUtil; import org.apache.kylin.engine.mr.HadoopUtil; -import org.apache.kylin.metadata.MetadataManager; import org.apache.kylin.metadata.model.TableDesc; import org.apache.kylin.metadata.streaming.StreamingConfig; import org.apache.kylin.rest.exception.BadRequestException; @@ -34,9 +32,9 @@ import org.apache.kylin.rest.exception.ForbiddenException; import org.apache.kylin.rest.exception.InternalErrorException; import org.apache.kylin.rest.exception.NotFoundException; import org.apache.kylin.rest.request.StreamingRequest; -import org.apache.kylin.rest.service.CubeService; import org.apache.kylin.rest.service.KafkaConfigService; import org.apache.kylin.rest.service.StreamingService; +import org.apache.kylin.rest.service.TableService; import org.apache.kylin.source.kafka.config.KafkaConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -69,7 +67,7 @@ public class StreamingController extends BasicController { @Autowired private KafkaConfigService kafkaConfigService; @Autowired - private CubeService cubeMgmtService; + private TableService tableService; @RequestMapping(value = "/getConfig", method = { RequestMethod.GET }) @ResponseBody @@ -113,10 +111,7 @@ public class StreamingController extends BasicController { boolean saveStreamingSuccess = false, saveKafkaSuccess = false; try { - tableDesc.setUuid(UUID.randomUUID().toString()); - MetadataManager metaMgr = MetadataManager.getInstance(KylinConfig.getInstanceFromEnv()); - metaMgr.saveSourceTable(tableDesc); - cubeMgmtService.syncTableToProject(new String[] { tableDesc.getIdentity() }, project); + tableService.addStreamingTable(tableDesc, project); } catch (IOException e) { throw new BadRequestException("Failed to add streaming table."); } @@ -289,15 +284,4 @@ public class StreamingController extends BasicController { request.setMessage(message); } - public void setStreamingService(StreamingService streamingService) { - this.streamingService = streamingService; - } - - public void setKafkaConfigService(KafkaConfigService kafkaConfigService) { - this.kafkaConfigService = kafkaConfigService; - } - - public void setCubeService(CubeService cubeService) { - this.cubeMgmtService = cubeService; - } } http://git-wip-us.apache.org/repos/asf/kylin/blob/e2e2a81c/server-base/src/main/java/org/apache/kylin/rest/controller/TableController.java ---------------------------------------------------------------------- diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller/TableController.java b/server-base/src/main/java/org/apache/kylin/rest/controller/TableController.java index 74d1b28..eed5413 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/controller/TableController.java +++ b/server-base/src/main/java/org/apache/kylin/rest/controller/TableController.java @@ -19,37 +19,18 @@ package org.apache.kylin.rest.controller; import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; import java.util.HashMap; -import java.util.Iterator; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.UUID; -import org.apache.commons.lang.StringUtils; -import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.util.JsonUtil; -import org.apache.kylin.engine.mr.HadoopUtil; -import org.apache.kylin.metadata.MetadataManager; -import org.apache.kylin.metadata.model.ColumnDesc; import org.apache.kylin.metadata.model.TableDesc; -import org.apache.kylin.metadata.model.TableExtDesc; -import org.apache.kylin.metadata.streaming.StreamingConfig; import org.apache.kylin.rest.exception.InternalErrorException; +import org.apache.kylin.rest.exception.NotFoundException; import org.apache.kylin.rest.request.CardinalityRequest; import org.apache.kylin.rest.request.HiveTableRequest; -import org.apache.kylin.rest.request.StreamingRequest; -import org.apache.kylin.rest.response.TableDescResponse; -import org.apache.kylin.rest.service.CubeService; -import org.apache.kylin.rest.service.KafkaConfigService; -import org.apache.kylin.rest.service.ModelService; -import org.apache.kylin.rest.service.ProjectService; -import org.apache.kylin.rest.service.StreamingService; -import org.apache.kylin.source.hive.HiveClientFactory; -import org.apache.kylin.source.hive.IHiveClient; -import org.apache.kylin.source.kafka.config.KafkaConfig; +import org.apache.kylin.rest.service.TableService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -70,44 +51,27 @@ import com.google.common.collect.Sets; @Controller @RequestMapping(value = "/tables") public class TableController extends BasicController { + private static final Logger logger = LoggerFactory.getLogger(TableController.class); @Autowired - private CubeService cubeMgmtService; - @Autowired - private ProjectService projectService; - @Autowired - private StreamingService streamingService; - @Autowired - private KafkaConfigService kafkaConfigService; - @Autowired - private ModelService modelService; + private TableService tableService; /** - * Get available table list of the input database + * Get available table list of the project * * @return Table metadata array * @throws IOException */ @RequestMapping(value = "", method = { RequestMethod.GET }) @ResponseBody - public List<TableDesc> getHiveTables(@RequestParam(value = "ext", required = false) boolean withExt, @RequestParam(value = "project", required = true) String project) throws IOException { - long start = System.currentTimeMillis(); - List<TableDesc> tables = null; + public List<TableDesc> getTableDesc(@RequestParam(value = "ext", required = false) boolean withExt, @RequestParam(value = "project", required = true) String project) throws IOException { try { - tables = cubeMgmtService.getProjectManager().listDefinedTables(project); - } catch (Exception e) { - logger.error("Failed to deal with the request.", e); + return tableService.getTableDescByProject(project, withExt); + } catch (IOException e) { + logger.error("Failed to get Hive Tables", e); throw new InternalErrorException(e.getLocalizedMessage()); } - - if (withExt) { - tables = cloneTableDesc(tables); - } - long end = System.currentTimeMillis(); - logger.info("Return all table metadata in " + (end - start) + " seconds"); - - return tables; } /** @@ -118,29 +82,39 @@ public class TableController extends BasicController { */ @RequestMapping(value = "/{tableName:.+}", method = { RequestMethod.GET }) @ResponseBody - public TableDesc getHiveTable(@PathVariable String tableName) { - return cubeMgmtService.getMetadataManager().getTableDesc(tableName); - } - - @RequestMapping(value = "/reload", method = { RequestMethod.PUT }) - @ResponseBody - public String reloadSourceTable() { - cubeMgmtService.getMetadataManager().reload(); - return "ok"; + public TableDesc getTableDesc(@PathVariable String tableName) { + TableDesc table = tableService.getTableDescByName(tableName, false); + if (table == null) + throw new NotFoundException("Could not find Hive table: " + tableName); + return table; } @RequestMapping(value = "/{tables}/{project}", method = { RequestMethod.POST }) @ResponseBody - public Map<String, String[]> loadHiveTable(@PathVariable String tables, @PathVariable String project, @RequestBody HiveTableRequest request) throws IOException { + public Map<String, String[]> loadHiveTables(@PathVariable String tables, @PathVariable String project, @RequestBody HiveTableRequest request) throws IOException { String submitter = SecurityContextHolder.getContext().getAuthentication().getName(); - String[] loaded = cubeMgmtService.reloadHiveTable(tables); - if (request.isCalculate()) { - cubeMgmtService.calculateCardinalityIfNotPresent(loaded, submitter); - } - cubeMgmtService.syncTableToProject(loaded, project); Map<String, String[]> result = new HashMap<String, String[]>(); - result.put("result.loaded", loaded); - result.put("result.unloaded", new String[] {}); + String[] tableNames = tables.split(","); + try { + String[] loaded = tableService.loadHiveTablesToProject(tableNames, project); + result.put("result.loaded", loaded); + Set<String> allTables = new HashSet<String>(); + for (String tableName : tableNames) { + allTables.add(tableService.normalizeHiveTableName(tableName)); + } + for (String loadedTableName : loaded) { + allTables.remove(loadedTableName); + } + String[] unloaded = new String[allTables.size()]; + allTables.toArray(unloaded); + result.put("result.unloaded", unloaded); + if (request.isCalculate()) { + tableService.calculateCardinalityIfNotPresent(loaded, submitter); + } + } catch (Exception e) { + logger.error("Failed to load Hive Table", e); + throw new InternalErrorException(e.getLocalizedMessage()); + } return result; } @@ -150,12 +124,17 @@ public class TableController extends BasicController { Set<String> unLoadSuccess = Sets.newHashSet(); Set<String> unLoadFail = Sets.newHashSet(); Map<String, String[]> result = new HashMap<String, String[]>(); - for (String tableName : tables.split(",")) { - if (unLoadHiveTable(tableName, project)) { - unLoadSuccess.add(tableName); - } else { - unLoadFail.add(tableName); + try { + for (String tableName : tables.split(",")) { + if (tableService.unLoadHiveTable(tableName, project)) { + unLoadSuccess.add(tableName); + } else { + unLoadFail.add(tableName); + } } + } catch (Exception e) { + logger.error("Failed to unload Hive Table", e); + throw new InternalErrorException(e.getLocalizedMessage()); } result.put("result.unload.success", (String[]) unLoadSuccess.toArray(new String[unLoadSuccess.size()])); result.put("result.unload.fail", (String[]) unLoadFail.toArray(new String[unLoadFail.size()])); @@ -163,77 +142,6 @@ public class TableController extends BasicController { } /** - * table may referenced by several projects, and kylin only keep one copy of meta for each table, - * that's why we have two if statement here. - * @param tableName - * @param project - * @return - */ - private boolean unLoadHiveTable(String tableName, String project) { - boolean rtn = false; - int tableType = 0; - - //remove streaming info - String[] dbTableName = HadoopUtil.parseHiveTableName(tableName); - tableName = dbTableName[0] + "." + dbTableName[1]; - TableDesc desc = cubeMgmtService.getMetadataManager().getTableDesc(tableName); - if (desc == null) - return false; - tableType = desc.getSourceType(); - - try { - if (!modelService.isTableInModel(tableName, project)) { - cubeMgmtService.removeTableFromProject(tableName, project); - rtn = true; - } else { - List<String> models = modelService.getModelsUsingTable(tableName, project); - throw new InternalErrorException("Table is already in use by models " + models); - } - } catch (IOException e) { - logger.error(e.getMessage(), e); - } - if (!projectService.isTableInAnyProject(tableName) && !modelService.isTableInAnyModel(tableName)) { - try { - cubeMgmtService.unLoadHiveTable(tableName); - rtn = true; - } catch (IOException e) { - logger.error(e.getMessage(), e); - rtn = false; - } - } - - if (tableType == 1 && !projectService.isTableInAnyProject(tableName) && !modelService.isTableInAnyModel(tableName)) { - StreamingConfig config = null; - KafkaConfig kafkaConfig = null; - try { - config = streamingService.getStreamingManager().getStreamingConfig(tableName); - kafkaConfig = kafkaConfigService.getKafkaConfig(tableName); - streamingService.dropStreamingConfig(config); - kafkaConfigService.dropKafkaConfig(kafkaConfig); - rtn = true; - } catch (Exception e) { - rtn = false; - logger.error(e.getLocalizedMessage(), e); - } - } - return rtn; - } - - @RequestMapping(value = "/addStreamingSrc", method = { RequestMethod.POST }) - @ResponseBody - public Map<String, String> addStreamingTable(@RequestBody StreamingRequest request) throws IOException { - Map<String, String> result = new HashMap<String, String>(); - String project = request.getProject(); - TableDesc desc = JsonUtil.readValue(request.getTableData(), TableDesc.class); - desc.setUuid(UUID.randomUUID().toString()); - MetadataManager metaMgr = MetadataManager.getInstance(KylinConfig.getInstanceFromEnv()); - metaMgr.saveSourceTable(desc); - cubeMgmtService.syncTableToProject(new String[] { desc.getName() }, project); - result.put("success", "true"); - return result; - } - - /** * Regenerate table cardinality * * @return Table metadata array @@ -244,57 +152,15 @@ public class TableController extends BasicController { public CardinalityRequest generateCardinality(@PathVariable String tableNames, @RequestBody CardinalityRequest request) throws IOException { String submitter = SecurityContextHolder.getContext().getAuthentication().getName(); String[] tables = tableNames.split(","); - for (String table : tables) { - cubeMgmtService.calculateCardinality(table.trim().toUpperCase(), submitter); - } - return request; - } - - /** - * @param tables - * @return - */ - private List<TableDesc> cloneTableDesc(List<TableDesc> tables) throws IOException { - if (null == tables) { - return Collections.emptyList(); - } - - List<TableDesc> descs = new ArrayList<TableDesc>(); - Iterator<TableDesc> it = tables.iterator(); - while (it.hasNext()) { - TableDesc table = it.next(); - TableExtDesc tableExtDesc = cubeMgmtService.getMetadataManager().getTableExt(table.getIdentity()); - - // Clone TableDesc - TableDescResponse rtableDesc = new TableDescResponse(table); - Map<String, Long> cardinality = new HashMap<String, Long>(); - Map<String, String> dataSourceProp = new HashMap<>(); - String scard = tableExtDesc.getCardinality(); - if (!StringUtils.isEmpty(scard)) { - String[] cards = StringUtils.split(scard, ","); - ColumnDesc[] cdescs = rtableDesc.getColumns(); - for (int i = 0; i < cdescs.length; i++) { - ColumnDesc columnDesc = cdescs[i]; - if (cards.length > i) { - cardinality.put(columnDesc.getName(), Long.parseLong(cards[i])); - } else { - logger.error("The result cardinality is not identical with hive table metadata, cardinaly : " + scard + " column array length: " + cdescs.length); - break; - } - } - rtableDesc.setCardinality(cardinality); + try { + for (String table : tables) { + tableService.calculateCardinality(table.trim().toUpperCase(), submitter); } - dataSourceProp.putAll(tableExtDesc.getDataSourceProp()); - dataSourceProp.put("location", tableExtDesc.getStorageLocation()); - dataSourceProp.put("owner", tableExtDesc.getOwner()); - dataSourceProp.put("last_access_time", tableExtDesc.getLastAccessTime()); - dataSourceProp.put("partition_column", tableExtDesc.getPartitionColumn()); - dataSourceProp.put("total_file_size", tableExtDesc.getTotalFileSize()); - rtableDesc.setDescExd(dataSourceProp); - descs.add(rtableDesc); + } catch (IOException e) { + logger.error("Failed to calculate cardinality", e); + throw new InternalErrorException(e.getLocalizedMessage()); } - - return descs; + return request; } /** @@ -305,17 +171,12 @@ public class TableController extends BasicController { */ @RequestMapping(value = "/hive", method = { RequestMethod.GET }) @ResponseBody - private static List<String> showHiveDatabases() throws IOException { - IHiveClient hiveClient = HiveClientFactory.getHiveClient(); - List<String> results = null; - + private List<String> showHiveDatabases() throws IOException { try { - results = hiveClient.getHiveDbNames(); + return tableService.getHiveDbNames(); } catch (Exception e) { - e.printStackTrace(); - throw new IOException(e); + throw new InternalErrorException(e.getLocalizedMessage()); } - return results; } /** @@ -326,21 +187,12 @@ public class TableController extends BasicController { */ @RequestMapping(value = "/hive/{database}", method = { RequestMethod.GET }) @ResponseBody - private static List<String> showHiveTables(@PathVariable String database) throws IOException { - IHiveClient hiveClient = HiveClientFactory.getHiveClient(); - List<String> results = null; - + private List<String> showHiveTables(@PathVariable String database) throws IOException { try { - results = hiveClient.getHiveTableNames(database); + return tableService.getHiveTableNames(database); } catch (Exception e) { - e.printStackTrace(); - throw new IOException(e); + throw new InternalErrorException(e.getLocalizedMessage()); } - return results; - } - - public void setCubeService(CubeService cubeService) { - this.cubeMgmtService = cubeService; } } http://git-wip-us.apache.org/repos/asf/kylin/blob/e2e2a81c/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java ---------------------------------------------------------------------- diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java b/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java index 85c9284..23aa5a4 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java +++ b/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java @@ -25,7 +25,6 @@ import java.util.Date; import java.util.EnumSet; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.WeakHashMap; import org.apache.commons.io.IOUtils; @@ -41,17 +40,10 @@ import org.apache.kylin.cube.cuboid.CuboidCLI; import org.apache.kylin.cube.model.CubeDesc; import org.apache.kylin.engine.EngineFactory; import org.apache.kylin.engine.mr.CubingJob; -import org.apache.kylin.engine.mr.HadoopUtil; -import org.apache.kylin.engine.mr.common.HadoopShellExecutable; -import org.apache.kylin.engine.mr.common.MapReduceExecutable; import org.apache.kylin.job.exception.JobException; import org.apache.kylin.job.execution.DefaultChainedExecutable; -import org.apache.kylin.job.execution.ExecutableManager; import org.apache.kylin.job.execution.ExecutableState; -import org.apache.kylin.metadata.MetadataManager; import org.apache.kylin.metadata.model.SegmentStatusEnum; -import org.apache.kylin.metadata.model.TableDesc; -import org.apache.kylin.metadata.model.TableExtDesc; import org.apache.kylin.metadata.project.ProjectInstance; import org.apache.kylin.metadata.project.ProjectManager; import org.apache.kylin.metadata.project.RealizationEntry; @@ -63,9 +55,6 @@ import org.apache.kylin.rest.request.MetricsRequest; import org.apache.kylin.rest.response.HBaseResponse; import org.apache.kylin.rest.response.MetricsResponse; import org.apache.kylin.rest.security.AclPermission; -import org.apache.kylin.source.hive.HiveSourceTableLoader; -import org.apache.kylin.source.hive.cardinality.HiveColumnCardinalityJob; -import org.apache.kylin.source.hive.cardinality.HiveColumnCardinalityUpdateJob; import org.apache.kylin.storage.hbase.HBaseConnection; import org.apache.kylin.storage.hbase.util.HBaseRegionSizeCalculator; import org.slf4j.Logger; @@ -438,53 +427,6 @@ public class CubeService extends BasicService { return hr; } - /** - * Generate cardinality for table This will trigger a hadoop job - * The result will be merged into table exd info - * - * @param tableName - */ - @PreAuthorize(Constant.ACCESS_HAS_ROLE_MODELER + " or " + Constant.ACCESS_HAS_ROLE_ADMIN) - public void calculateCardinality(String tableName, String submitter) throws IOException { - String[] dbTableName = HadoopUtil.parseHiveTableName(tableName); - tableName = dbTableName[0] + "." + dbTableName[1]; - TableDesc table = getMetadataManager().getTableDesc(tableName); - final TableExtDesc tableExt = getMetadataManager().getTableExt(tableName); - if (table == null) { - IllegalArgumentException e = new IllegalArgumentException("Cannot find table descirptor " + tableName); - logger.error("Cannot find table descirptor " + tableName, e); - throw e; - } - - DefaultChainedExecutable job = new DefaultChainedExecutable(); - //make sure the job could be scheduled when the DistributedScheduler is enable. - job.setParam("segmentId", tableName); - job.setName("Hive Column Cardinality calculation for table '" + tableName + "'"); - job.setSubmitter(submitter); - - String outPath = getConfig().getHdfsWorkingDirectory() + "cardinality/" + job.getId() + "/" + tableName; - String param = "-table " + tableName + " -output " + outPath; - - MapReduceExecutable step1 = new MapReduceExecutable(); - - step1.setMapReduceJobClass(HiveColumnCardinalityJob.class); - step1.setMapReduceParams(param); - step1.setParam("segmentId", tableName); - - job.addTask(step1); - - HadoopShellExecutable step2 = new HadoopShellExecutable(); - - step2.setJobClass(HiveColumnCardinalityUpdateJob.class); - step2.setJobParams(param); - step2.setParam("segmentId", tableName); - job.addTask(step2); - tableExt.setJodID(job.getId()); - getMetadataManager().saveTableExt(tableExt); - - getExecutableManager().addJob(job); - } - @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN + " or hasPermission(#cube, 'ADMINISTRATION') or hasPermission(#cube, 'OPERATION') or hasPermission(#cube, 'MANAGEMENT')") public void updateCubeNotifyList(CubeInstance cube, List<String> notifyList) throws IOException { CubeDesc desc = cube.getDescriptor(); @@ -546,44 +488,6 @@ public class CubeService extends BasicService { CubeManager.getInstance(getConfig()).updateCube(update); } - @PreAuthorize(Constant.ACCESS_HAS_ROLE_MODELER + " or " + Constant.ACCESS_HAS_ROLE_ADMIN) - public String[] reloadHiveTable(String tables) throws IOException { - Set<String> loaded = HiveSourceTableLoader.reloadHiveTables(tables.split(","), getConfig()); - return (String[]) loaded.toArray(new String[loaded.size()]); - } - - @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN) - public void unLoadHiveTable(String tableName) throws IOException { - String[] dbTableName = HadoopUtil.parseHiveTableName(tableName); - tableName = dbTableName[0] + "." + dbTableName[1]; - HiveSourceTableLoader.unLoadHiveTable(tableName.toUpperCase()); - } - - @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN) - public void syncTableToProject(String[] tables, String project) throws IOException { - getProjectManager().addTableDescToProject(tables, project); - } - - @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN) - public void removeTableFromProject(String tableName, String projectName) throws IOException { - String[] dbTableName = HadoopUtil.parseHiveTableName(tableName); - tableName = dbTableName[0] + "." + dbTableName[1]; - getProjectManager().removeTableDescFromProject(tableName, projectName); - } - - @PreAuthorize(Constant.ACCESS_HAS_ROLE_MODELER + " or " + Constant.ACCESS_HAS_ROLE_ADMIN) - public void calculateCardinalityIfNotPresent(String[] tables, String submitter) throws IOException { - MetadataManager metaMgr = getMetadataManager(); - ExecutableManager exeMgt = ExecutableManager.getInstance(getConfig()); - for (String table : tables) { - TableExtDesc tableExtDesc = metaMgr.getTableExt(table); - String jobID = tableExtDesc.getJodID(); - if (null == jobID || ExecutableState.RUNNING != exeMgt.getOutput(jobID).getState()) { - calculateCardinality(table, submitter); - } - } - } - public void updateOnNewSegmentReady(String cubeName) { final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); String serverMode = kylinConfig.getServerMode(); http://git-wip-us.apache.org/repos/asf/kylin/blob/e2e2a81c/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java ---------------------------------------------------------------------- diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java b/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java new file mode 100644 index 0000000..461800e --- /dev/null +++ b/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java @@ -0,0 +1,318 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.rest.service; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; + +import org.apache.commons.lang.StringUtils; +import org.apache.kylin.engine.mr.HadoopUtil; +import org.apache.kylin.engine.mr.common.HadoopShellExecutable; +import org.apache.kylin.engine.mr.common.MapReduceExecutable; +import org.apache.kylin.job.execution.DefaultChainedExecutable; +import org.apache.kylin.job.execution.ExecutableManager; +import org.apache.kylin.job.execution.ExecutableState; +import org.apache.kylin.metadata.MetadataManager; +import org.apache.kylin.metadata.model.ColumnDesc; +import org.apache.kylin.metadata.model.TableDesc; +import org.apache.kylin.metadata.model.TableExtDesc; +import org.apache.kylin.metadata.streaming.StreamingConfig; +import org.apache.kylin.rest.constant.Constant; +import org.apache.kylin.rest.exception.InternalErrorException; +import org.apache.kylin.rest.response.TableDescResponse; +import org.apache.kylin.source.hive.HiveClientFactory; +import org.apache.kylin.source.hive.HiveSourceTableLoader; +import org.apache.kylin.source.hive.IHiveClient; +import org.apache.kylin.source.hive.cardinality.HiveColumnCardinalityJob; +import org.apache.kylin.source.hive.cardinality.HiveColumnCardinalityUpdateJob; +import org.apache.kylin.source.kafka.config.KafkaConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.security.access.prepost.PreAuthorize; +import org.springframework.stereotype.Component; + +@Component("tableService") +public class TableService extends BasicService { + + private static final Logger logger = LoggerFactory.getLogger(TableService.class); + + @Autowired + private ModelService modelService; + + @Autowired + private ProjectService projectService; + + @Autowired + private StreamingService streamingService; + + @Autowired + private KafkaConfigService kafkaConfigService; + + public List<TableDesc> getTableDescByProject(String project, boolean withExt) throws IOException { + List<TableDesc> tables = getProjectManager().listDefinedTables(project); + if (null == tables) { + return Collections.emptyList(); + } + if (withExt) { + tables = cloneTableDesc(tables); + } + return tables; + } + + public TableDesc getTableDescByName(String tableName, boolean withExt) { + TableDesc table = getMetadataManager().getTableDesc(tableName); + if(withExt){ + table = cloneTableDesc(table); + } + return table; + } + + @PreAuthorize(Constant.ACCESS_HAS_ROLE_MODELER + " or " + Constant.ACCESS_HAS_ROLE_ADMIN) + public String[] loadHiveTablesToProject(String[] tables, String project) throws IOException { + Set<String> loaded = HiveSourceTableLoader.loadHiveTables(tables, getConfig()); + String[] result = (String[]) loaded.toArray(new String[loaded.size()]); + syncTableToProject(result, project); + return result; + } + + @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN) + private void unLoadHiveTable(String tableName) throws IOException { + tableName = normalizeHiveTableName(tableName); + HiveSourceTableLoader.unLoadHiveTable(tableName.toUpperCase()); + } + + @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN) + private void syncTableToProject(String[] tables, String project) throws IOException { + getProjectManager().addTableDescToProject(tables, project); + } + + @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN) + private void removeTableFromProject(String tableName, String projectName) throws IOException { + tableName = normalizeHiveTableName(tableName); + getProjectManager().removeTableDescFromProject(tableName, projectName); + } + + /** + * table may referenced by several projects, and kylin only keep one copy of meta for each table, + * that's why we have two if statement here. + * @param tableName + * @param project + * @return + */ + public boolean unLoadHiveTable(String tableName, String project) { + boolean rtn = false; + int tableType = 0; + + //remove streaming info + tableName = normalizeHiveTableName(tableName); + TableDesc desc = getMetadataManager().getTableDesc(tableName); + if (desc == null) + return false; + tableType = desc.getSourceType(); + + try { + if (!modelService.isTableInModel(tableName, project)) { + removeTableFromProject(tableName, project); + rtn = true; + } else { + List<String> models = modelService.getModelsUsingTable(tableName, project); + throw new InternalErrorException("Table is already in use by models " + models); + } + } catch (IOException e) { + logger.error(e.getMessage(), e); + } + if (!projectService.isTableInAnyProject(tableName) && !modelService.isTableInAnyModel(tableName)) { + try { + unLoadHiveTable(tableName); + rtn = true; + } catch (IOException e) { + logger.error(e.getMessage(), e); + rtn = false; + } + } + + if (tableType == 1 && !projectService.isTableInAnyProject(tableName) && !modelService.isTableInAnyModel(tableName)) { + StreamingConfig config = null; + KafkaConfig kafkaConfig = null; + try { + config = streamingService.getStreamingManager().getStreamingConfig(tableName); + kafkaConfig = kafkaConfigService.getKafkaConfig(tableName); + streamingService.dropStreamingConfig(config); + kafkaConfigService.dropKafkaConfig(kafkaConfig); + rtn = true; + } catch (Exception e) { + rtn = false; + logger.error(e.getLocalizedMessage(), e); + } + } + return rtn; + } + + /** + * + * @param desc + * @param project + * @throws IOException + */ + public void addStreamingTable(TableDesc desc, String project) throws IOException { + desc.setUuid(UUID.randomUUID().toString()); + getMetadataManager().saveSourceTable(desc); + syncTableToProject(new String[] { desc.getIdentity() }, project); + } + + /** + * + * @return + * @throws Exception + */ + public List<String> getHiveDbNames() throws Exception { + IHiveClient hiveClient = HiveClientFactory.getHiveClient(); + List<String> results = hiveClient.getHiveDbNames(); + return results; + } + + /** + * + * @param database + * @return + * @throws Exception + */ + public List<String> getHiveTableNames(String database) throws Exception { + IHiveClient hiveClient = HiveClientFactory.getHiveClient(); + List<String> results = hiveClient.getHiveTableNames(database); + return results; + } + + private TableDescResponse cloneTableDesc(TableDesc table) { + TableExtDesc tableExtDesc = getMetadataManager().getTableExt(table.getIdentity()); + + // Clone TableDesc + TableDescResponse rtableDesc = new TableDescResponse(table); + Map<String, Long> cardinality = new HashMap<String, Long>(); + Map<String, String> dataSourceProp = new HashMap<>(); + String scard = tableExtDesc.getCardinality(); + if (!StringUtils.isEmpty(scard)) { + String[] cards = StringUtils.split(scard, ","); + ColumnDesc[] cdescs = rtableDesc.getColumns(); + for (int i = 0; i < cdescs.length; i++) { + ColumnDesc columnDesc = cdescs[i]; + if (cards.length > i) { + cardinality.put(columnDesc.getName(), Long.parseLong(cards[i])); + } else { + logger.error("The result cardinality is not identical with hive table metadata, cardinality : " + scard + " column array length: " + cdescs.length); + break; + } + } + rtableDesc.setCardinality(cardinality); + } + dataSourceProp.putAll(tableExtDesc.getDataSourceProp()); + dataSourceProp.put("location", tableExtDesc.getStorageLocation()); + dataSourceProp.put("owner", tableExtDesc.getOwner()); + dataSourceProp.put("last_access_time", tableExtDesc.getLastAccessTime()); + dataSourceProp.put("partition_column", tableExtDesc.getPartitionColumn()); + dataSourceProp.put("total_file_size", tableExtDesc.getTotalFileSize()); + rtableDesc.setDescExd(dataSourceProp); + return rtableDesc; + } + + + private List<TableDesc> cloneTableDesc(List<TableDesc> tables) throws IOException { + List<TableDesc> descs = new ArrayList<TableDesc>(); + Iterator<TableDesc> it = tables.iterator(); + while (it.hasNext()) { + TableDesc table = it.next(); + TableDescResponse rtableDesc = cloneTableDesc(table); + descs.add(rtableDesc); + } + + return descs; + } + + @PreAuthorize(Constant.ACCESS_HAS_ROLE_MODELER + " or " + Constant.ACCESS_HAS_ROLE_ADMIN) + public void calculateCardinalityIfNotPresent(String[] tables, String submitter) throws IOException { + MetadataManager metaMgr = getMetadataManager(); + ExecutableManager exeMgt = ExecutableManager.getInstance(getConfig()); + for (String table : tables) { + TableExtDesc tableExtDesc = metaMgr.getTableExt(table); + String jobID = tableExtDesc.getJodID(); + if (null == jobID || ExecutableState.RUNNING != exeMgt.getOutput(jobID).getState()) { + calculateCardinality(table, submitter); + } + } + } + + /** + * Generate cardinality for table This will trigger a hadoop job + * The result will be merged into table exd info + * + * @param tableName + */ + @PreAuthorize(Constant.ACCESS_HAS_ROLE_MODELER + " or " + Constant.ACCESS_HAS_ROLE_ADMIN) + public void calculateCardinality(String tableName, String submitter) throws IOException { + tableName = normalizeHiveTableName(tableName); + TableDesc table = getMetadataManager().getTableDesc(tableName); + final TableExtDesc tableExt = getMetadataManager().getTableExt(tableName); + if (table == null) { + IllegalArgumentException e = new IllegalArgumentException("Cannot find table descirptor " + tableName); + logger.error("Cannot find table descirptor " + tableName, e); + throw e; + } + + DefaultChainedExecutable job = new DefaultChainedExecutable(); + //make sure the job could be scheduled when the DistributedScheduler is enable. + job.setParam("segmentId", tableName); + job.setName("Hive Column Cardinality calculation for table '" + tableName + "'"); + job.setSubmitter(submitter); + + String outPath = getConfig().getHdfsWorkingDirectory() + "cardinality/" + job.getId() + "/" + tableName; + String param = "-table " + tableName + " -output " + outPath; + + MapReduceExecutable step1 = new MapReduceExecutable(); + + step1.setMapReduceJobClass(HiveColumnCardinalityJob.class); + step1.setMapReduceParams(param); + step1.setParam("segmentId", tableName); + + job.addTask(step1); + + HadoopShellExecutable step2 = new HadoopShellExecutable(); + + step2.setJobClass(HiveColumnCardinalityUpdateJob.class); + step2.setJobParams(param); + step2.setParam("segmentId", tableName); + job.addTask(step2); + tableExt.setJodID(job.getId()); + getMetadataManager().saveTableExt(tableExt); + + getExecutableManager().addJob(job); + } + + public String normalizeHiveTableName(String tableName){ + String[] dbTableName = HadoopUtil.parseHiveTableName(tableName); + return dbTableName[0] + "." + dbTableName[1]; + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/e2e2a81c/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSourceTableLoader.java ---------------------------------------------------------------------- diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSourceTableLoader.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSourceTableLoader.java index 77e1084..b56009a 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSourceTableLoader.java +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSourceTableLoader.java @@ -51,7 +51,7 @@ public class HiveSourceTableLoader { @SuppressWarnings("unused") private static final Logger logger = LoggerFactory.getLogger(HiveSourceTableLoader.class); - public static Set<String> reloadHiveTables(String[] hiveTables, KylinConfig config) throws IOException { + public static Set<String> loadHiveTables(String[] hiveTables, KylinConfig config) throws IOException { SetMultimap<String, String> db2tables = LinkedHashMultimap.create(); for (String fullTableName : hiveTables) { http://git-wip-us.apache.org/repos/asf/kylin/blob/e2e2a81c/webapp/app/js/controllers/sourceMeta.js ---------------------------------------------------------------------- diff --git a/webapp/app/js/controllers/sourceMeta.js b/webapp/app/js/controllers/sourceMeta.js index bbb9915..a53a35f 100755 --- a/webapp/app/js/controllers/sourceMeta.js +++ b/webapp/app/js/controllers/sourceMeta.js @@ -330,7 +330,7 @@ KylinApp } if ($scope.tableNames.trim() === "") { - SweetAlert.swal('', 'Please input table(s) you want to synchronize.', 'info'); + SweetAlert.swal('', 'Please input table(s) you want to load.', 'info'); return; } @@ -352,13 +352,13 @@ KylinApp }) if (result['result.unloaded'].length != 0 && result['result.loaded'].length == 0) { - SweetAlert.swal('Failed!', 'Failed to synchronize following table(s): ' + unloadedTableInfo, 'error'); + SweetAlert.swal('Failed!', 'Failed to load following table(s): ' + unloadedTableInfo, 'error'); } if (result['result.loaded'].length != 0 && result['result.unloaded'].length == 0) { - SweetAlert.swal('Success!', 'The following table(s) have been successfully synchronized: ' + loadTableInfo, 'success'); + SweetAlert.swal('Success!', 'The following table(s) have been successfully loaded: ' + loadTableInfo, 'success'); } if (result['result.loaded'].length != 0 && result['result.unloaded'].length != 0) { - SweetAlert.swal('Partial loaded!', 'The following table(s) have been successfully synchronized: ' + loadTableInfo + "\n\n Failed to synchronize following table(s):" + unloadedTableInfo, 'warning'); + SweetAlert.swal('Partial loaded!', 'The following table(s) have been successfully loaded: ' + loadTableInfo + "\n\n Failed to load following table(s):" + unloadedTableInfo, 'warning'); } loadingRequest.hide(); scope.aceSrcTbLoaded(true); @@ -378,7 +378,7 @@ KylinApp $scope.remove = function () { if ($scope.tableNames.trim() === "") { - SweetAlert.swal('', 'Please input table(s) you want to synchronize.', 'info'); + SweetAlert.swal('', 'Please input table(s) you want to unload.', 'info'); return; } @@ -400,13 +400,13 @@ KylinApp }) if (result['result.unload.fail'].length != 0 && result['result.unload.success'].length == 0) { - SweetAlert.swal('Failed!', 'Failed to synchronize following table(s): ' + unRemovedTableInfo, 'error'); + SweetAlert.swal('Failed!', 'Failed to unload following table(s): ' + unRemovedTableInfo, 'error'); } if (result['result.unload.success'].length != 0 && result['result.unload.fail'].length == 0) { - SweetAlert.swal('Success!', 'The following table(s) have been successfully synchronized: ' + removedTableInfo, 'success'); + SweetAlert.swal('Success!', 'The following table(s) have been successfully unloaded: ' + removedTableInfo, 'success'); } if (result['result.unload.success'].length != 0 && result['result.unload.fail'].length != 0) { - SweetAlert.swal('Partial unloaded!', 'The following table(s) have been successfully synchronized: ' + removedTableInfo + "\n\n Failed to synchronize following table(s):" + unRemovedTableInfo, 'warning'); + SweetAlert.swal('Partial unloaded!', 'The following table(s) have been successfully unloaded: ' + removedTableInfo + "\n\n Failed to unload following table(s):" + unRemovedTableInfo, 'warning'); } loadingRequest.hide(); scope.aceSrcTbLoaded(true); http://git-wip-us.apache.org/repos/asf/kylin/blob/e2e2a81c/webapp/app/js/services/tables.js ---------------------------------------------------------------------- diff --git a/webapp/app/js/services/tables.js b/webapp/app/js/services/tables.js index 4199d6c..4e7a7c4 100755 --- a/webapp/app/js/services/tables.js +++ b/webapp/app/js/services/tables.js @@ -24,7 +24,6 @@ KylinApp.factory('TableService', ['$resource', function ($resource, config) { reload: {method: 'PUT', params: {action: 'reload'}, isArray: false}, loadHiveTable: {method: 'POST', params: {}, isArray: false}, unLoadHiveTable: {method: 'DELETE', params: {}, isArray: false}, - addStreamingSrc: {method: 'POST', params: {action:'addStreamingSrc'}, isArray: false}, genCardinality: {method: 'PUT', params: {action: 'cardinality'}, isArray: false}, showHiveDatabases: {method: 'GET', params: {action:'hive'}, cache: true, isArray: true}, showHiveTables: {method: 'GET', params: {action:'hive'}, cache: true, isArray: true}