KYLIN-2187 Update table_ext metadata in a couple of classes Signed-off-by: Li Yang <[email protected]>
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/40dca957 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/40dca957 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/40dca957 Branch: refs/heads/master Commit: 40dca95711e5e9c35df0b4ff30f4a87069f3d286 Parents: d1c0b58 Author: Cheng Wang <[email protected]> Authored: Tue Nov 15 19:06:26 2016 +0800 Committer: Li Yang <[email protected]> Committed: Wed Nov 16 10:53:43 2016 +0800 ---------------------------------------------------------------------- .../apache/kylin/metadata/MetadataManager.java | 132 ++++++++----------- .../kylin/rest/controller/TableController.java | 72 ++++------ .../kylin/rest/response/TableDescResponse.java | 7 +- .../apache/kylin/rest/service/CubeService.java | 17 ++- .../apache/kylin/rest/service/JobService.java | 5 + .../source/hive/HiveSourceTableLoader.java | 35 ++--- .../cardinality/HiveColumnCardinalityJob.java | 1 + .../HiveColumnCardinalityUpdateJob.java | 9 +- 8 files changed, 120 insertions(+), 158 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/40dca957/core-metadata/src/main/java/org/apache/kylin/metadata/MetadataManager.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/MetadataManager.java b/core-metadata/src/main/java/org/apache/kylin/metadata/MetadataManager.java index e0c78ce..5e12016 100644 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/MetadataManager.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/MetadataManager.java @@ -18,27 +18,19 @@ package org.apache.kylin.metadata; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; import java.io.IOException; -import java.io.InputStream; import java.util.ArrayList; import java.util.Collections; -import java.util.HashMap; import java.util.HashSet; -import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.Map.Entry; +import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; -import org.apache.commons.io.IOUtils; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.persistence.JsonSerializer; -import org.apache.kylin.common.persistence.RawResource; import org.apache.kylin.common.persistence.ResourceStore; import org.apache.kylin.common.persistence.Serializer; -import org.apache.kylin.common.util.JsonUtil; import org.apache.kylin.metadata.cachesync.Broadcaster; import org.apache.kylin.metadata.cachesync.Broadcaster.Event; import org.apache.kylin.metadata.cachesync.CaseInsensitiveStringCache; @@ -46,13 +38,13 @@ import org.apache.kylin.metadata.model.ColumnDesc; import org.apache.kylin.metadata.model.DataModelDesc; import org.apache.kylin.metadata.model.ExternalFilterDesc; import org.apache.kylin.metadata.model.TableDesc; +import org.apache.kylin.metadata.model.TableExtDesc; import org.apache.kylin.metadata.project.ProjectInstance; import org.apache.kylin.metadata.project.ProjectManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.collect.Lists; -import com.google.common.collect.Maps; /** * Serves (and caches) metadata for Kylin instance. @@ -67,6 +59,7 @@ public class MetadataManager { private static final Logger logger = LoggerFactory.getLogger(MetadataManager.class); public static final Serializer<TableDesc> TABLE_SERIALIZER = new JsonSerializer<TableDesc>(TableDesc.class); + public static final Serializer<TableExtDesc> TABLE_EXT_SERIALIZER = new JsonSerializer<TableExtDesc>(TableExtDesc.class); public static final Serializer<DataModelDesc> MODELDESC_SERIALIZER = new JsonSerializer<DataModelDesc>(DataModelDesc.class); public static final Serializer<ExternalFilterDesc> EXTERNAL_FILTER_DESC_SERIALIZER = new JsonSerializer<ExternalFilterDesc>(ExternalFilterDesc.class); @@ -108,7 +101,7 @@ public class MetadataManager { // table name ==> SourceTable private CaseInsensitiveStringCache<TableDesc> srcTableMap; // name => value - private CaseInsensitiveStringCache<Map<String, String>> srcTableExdMap; + private CaseInsensitiveStringCache<TableExtDesc> srcTableExdMap; // name => DataModelDesc private CaseInsensitiveStringCache<DataModelDesc> dataModelDescMap; // name => External Filter Desc @@ -155,7 +148,7 @@ public class MetadataManager { return Collections.unmodifiableMap(srcTableMap.getMap()); } - public Map<String, Map<String, String>> listAllTableExdMap() { + public Map<String, TableExtDesc> listAllTableExdMap() { return srcTableExdMap.getMap(); } @@ -199,23 +192,43 @@ public class MetadataManager { * @param tableName * @return */ - public Map<String, String> getTableDescExd(String tableName) { - String tableIdentity = tableName; - Map<String, String> result = new HashMap<String, String>(); - if (srcTableExdMap.containsKey(tableIdentity)) { - Map<String, String> tmp = srcTableExdMap.get(tableIdentity); - Iterator<Entry<String, String>> it = tmp.entrySet().iterator(); - while (it.hasNext()) { - Entry<String, String> entry = it.next(); - result.put(entry.getKey(), entry.getValue()); - } - result.put(MetadataConstants.TABLE_EXD_STATUS_KEY, "true"); - } else { - result.put(MetadataConstants.TABLE_EXD_STATUS_KEY, "false"); + public TableExtDesc getTableExt(String tableName) throws IOException { + if (tableName.indexOf(".") < 0) + tableName = "DEFAULT." + tableName; + + TableExtDesc result = srcTableExdMap.get(tableName.toUpperCase()); + + // create new + if (null == result) { + result = new TableExtDesc(); + result.setName(tableName); + result.setUuid(UUID.randomUUID().toString()); + result.setLastModified(0); + result.init(); + saveTableExt(result); } return result; } + public void saveTableExt(TableExtDesc tableExt) throws IOException { + if (tableExt.getUuid() == null || tableExt.getName() == null) { + throw new IllegalArgumentException(); + } + + tableExt.init(); + + String path = tableExt.getResourcePath(); + getStore().putResource(path, tableExt, TABLE_EXT_SERIALIZER); + + srcTableExdMap.put(tableExt.getName(), tableExt); + } + + public void removeTableExt(String tableName) throws IOException { + String path = TableExtDesc.concatResourcePath(tableName); + getStore().deleteResource(path); + srcTableExdMap.remove(tableName); + } + public void saveSourceTable(TableDesc srcTable) throws IOException { if (srcTable.getUuid() == null || srcTable.getIdentity() == null) { throw new IllegalArgumentException(); @@ -261,7 +274,7 @@ public class MetadataManager { this.extFilterMap = new CaseInsensitiveStringCache<>(config, "external_filter"); reloadAllSourceTable(); - reloadAllSourceTableExd(); + reloadAllTableExt(); reloadAllDataModel(); reloadAllExternalFilter(); @@ -351,48 +364,38 @@ public class MetadataManager { } } - private void reloadAllSourceTableExd() throws IOException { + private void reloadAllTableExt() throws IOException { ResourceStore store = getStore(); - logger.debug("Reloading SourceTable exd info from folder " + store.getReadableResourcePath(ResourceStore.TABLE_EXD_RESOURCE_ROOT)); + logger.debug("Reloading Table_exd info from folder " + store.getReadableResourcePath(ResourceStore.TABLE_EXD_RESOURCE_ROOT)); srcTableExdMap.clear(); List<String> paths = store.collectResourceRecursively(ResourceStore.TABLE_EXD_RESOURCE_ROOT, MetadataConstants.FILE_SURFIX); for (String path : paths) { - reloadSourceTableExdAt(path); + reloadTableExtAt(path); } logger.debug("Loaded " + srcTableExdMap.size() + " SourceTable EXD(s)"); } - @SuppressWarnings("unchecked") - private Map<String, String> reloadSourceTableExdAt(String path) throws IOException { - Map<String, String> attrs = Maps.newHashMap(); - + private TableExtDesc reloadTableExtAt(String path) throws IOException { ResourceStore store = getStore(); - RawResource res = store.getResource(path); - if (res == null) { - logger.warn("Failed to get table exd info from " + path); + TableExtDesc t = store.getResource(path, TableExtDesc.class, TABLE_EXT_SERIALIZER); + if (t == null) { return null; } + t.init(); - InputStream is = res.inputStream; + String name = t.getName(); - try { - attrs.putAll(JsonUtil.readValue(is, HashMap.class)); - } finally { - IOUtils.closeQuietly(is); + // remove old json + if (name == null) { + getStore().deleteResource(path); } - // parse table identity from file name - String file = path; - if (file.indexOf("/") > -1) { - file = file.substring(file.lastIndexOf("/") + 1); - } - String tableIdentity = file.substring(0, file.length() - MetadataConstants.FILE_SURFIX.length()).toUpperCase(); + srcTableExdMap.putLocal(name, t); - srcTableExdMap.putLocal(tableIdentity, attrs); - return attrs; + return t; } private void reloadAllExternalFilter() throws IOException { @@ -454,7 +457,7 @@ public class MetadataManager { } public void reloadSourceTableExt(String tableIdentity) throws IOException { - reloadSourceTableExdAt(TableDesc.concatExdResourcePath(tableIdentity)); + reloadTableExtAt(TableExtDesc.concatResourcePath(tableIdentity)); } public void reloadSourceTable(String tableIdentity) throws IOException { @@ -598,33 +601,4 @@ public class MetadataManager { return dataModelDesc; } - - public void saveTableExd(String tableId, Map<String, String> tableExdProperties) throws IOException { - if (tableId == null) { - throw new IllegalArgumentException("tableId couldn't be null"); - } - TableDesc srcTable = srcTableMap.get(tableId); - if (srcTable == null) { - throw new IllegalArgumentException("Couldn't find Source Table with identifier: " + tableId); - } - - String path = TableDesc.concatExdResourcePath(tableId); - - ByteArrayOutputStream os = new ByteArrayOutputStream(); - JsonUtil.writeValueIndent(os, tableExdProperties); - os.flush(); - InputStream is = new ByteArrayInputStream(os.toByteArray()); - getStore().putResource(path, is, System.currentTimeMillis()); - os.close(); - is.close(); - - srcTableExdMap.put(tableId, tableExdProperties); - } - - public void removeTableExd(String tableIdentity) throws IOException { - String path = TableDesc.concatExdResourcePath(tableIdentity); - getStore().deleteResource(path); - srcTableExdMap.remove(tableIdentity); - } - } http://git-wip-us.apache.org/repos/asf/kylin/blob/40dca957/server-base/src/main/java/org/apache/kylin/rest/controller/TableController.java ---------------------------------------------------------------------- diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller/TableController.java b/server-base/src/main/java/org/apache/kylin/rest/controller/TableController.java index 47ff3fe..4652cce 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/controller/TableController.java +++ b/server-base/src/main/java/org/apache/kylin/rest/controller/TableController.java @@ -32,11 +32,11 @@ import org.apache.commons.lang.StringUtils; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.JsonUtil; import org.apache.kylin.engine.mr.HadoopUtil; -import org.apache.kylin.metadata.streaming.StreamingConfig; -import org.apache.kylin.metadata.MetadataConstants; import org.apache.kylin.metadata.MetadataManager; import org.apache.kylin.metadata.model.ColumnDesc; import org.apache.kylin.metadata.model.TableDesc; +import org.apache.kylin.metadata.model.TableExtDesc; +import org.apache.kylin.metadata.streaming.StreamingConfig; import org.apache.kylin.rest.exception.InternalErrorException; import org.apache.kylin.rest.request.CardinalityRequest; import org.apache.kylin.rest.request.HiveTableRequest; @@ -91,7 +91,7 @@ public class TableController extends BasicController { */ @RequestMapping(value = "", method = { RequestMethod.GET }) @ResponseBody - public List<TableDesc> getHiveTables(@RequestParam(value = "ext", required = false) boolean withExt, @RequestParam(value = "project", required = true) String project) { + public List<TableDesc> getHiveTables(@RequestParam(value = "ext", required = false) boolean withExt, @RequestParam(value = "project", required = true) String project) throws IOException { long start = System.currentTimeMillis(); List<TableDesc> tables = null; try { @@ -122,19 +122,6 @@ public class TableController extends BasicController { return cubeMgmtService.getMetadataManager().getTableDesc(tableName); } - /** - * Get available table list of the input database - * - * @return Table metadata array - * @throws IOException - */ - @RequestMapping(value = "/{tableName}/exd-map", method = { RequestMethod.GET }) - @ResponseBody - public Map<String, String> getHiveTableExd(@PathVariable String tableName) { - Map<String, String> tableExd = cubeMgmtService.getMetadataManager().getTableDescExd(tableName); - return tableExd; - } - @RequestMapping(value = "/reload", method = { RequestMethod.PUT }) @ResponseBody public String reloadSourceTable() { @@ -198,9 +185,9 @@ public class TableController extends BasicController { if (!modelService.isTableInModel(tableName, project)) { cubeMgmtService.removeTableFromProject(tableName, project); rtn = true; - }else{ + } else { List<String> models = modelService.getModelsUsingTable(tableName, project); - throw new InternalErrorException("Table is already in use by models "+models); + throw new InternalErrorException("Table is already in use by models " + models); } } catch (IOException e) { logger.error(e.getMessage(), e); @@ -254,7 +241,7 @@ public class TableController extends BasicController { */ @RequestMapping(value = "/{tableNames}/cardinality", method = { RequestMethod.PUT }) @ResponseBody - public CardinalityRequest generateCardinality(@PathVariable String tableNames, @RequestBody CardinalityRequest request) { + public CardinalityRequest generateCardinality(@PathVariable String tableNames, @RequestBody CardinalityRequest request) throws IOException { String submitter = SecurityContextHolder.getContext().getAuthentication().getName(); String[] tables = tableNames.split(","); for (String table : tables) { @@ -267,7 +254,7 @@ public class TableController extends BasicController { * @param tables * @return */ - private List<TableDesc> cloneTableDesc(List<TableDesc> tables) { + private List<TableDesc> cloneTableDesc(List<TableDesc> tables) throws IOException { if (null == tables) { return Collections.emptyList(); } @@ -276,34 +263,31 @@ public class TableController extends BasicController { Iterator<TableDesc> it = tables.iterator(); while (it.hasNext()) { TableDesc table = it.next(); - Map<String, String> exd = cubeMgmtService.getMetadataManager().getTableDescExd(table.getIdentity()); - if (exd == null) { - descs.add(table); - } else { - // Clone TableDesc - TableDescResponse rtableDesc = new TableDescResponse(table); - rtableDesc.setDescExd(exd); - if (exd.containsKey(MetadataConstants.TABLE_EXD_CARDINALITY)) { - Map<String, Long> cardinality = new HashMap<String, Long>(); - String scard = exd.get(MetadataConstants.TABLE_EXD_CARDINALITY); - if (!StringUtils.isEmpty(scard)) { - String[] cards = StringUtils.split(scard, ","); - ColumnDesc[] cdescs = rtableDesc.getColumns(); - for (int i = 0; i < cdescs.length; i++) { - ColumnDesc columnDesc = cdescs[i]; - if (cards.length > i) { - cardinality.put(columnDesc.getName(), Long.parseLong(cards[i])); - } else { - logger.error("The result cardinality is not identical with hive table metadata, cardinaly : " + scard + " column array length: " + cdescs.length); - break; - } - } - rtableDesc.setCardinality(cardinality); + TableExtDesc tableExtDesc = cubeMgmtService.getMetadataManager().getTableExt(table.getIdentity()); + + // Clone TableDesc + TableDescResponse rtableDesc = new TableDescResponse(table); + rtableDesc.setDescExd(tableExtDesc); + + Map<String, Long> cardinality = new HashMap<String, Long>(); + String scard = tableExtDesc.getCardinality(); + if (!StringUtils.isEmpty(scard)) { + String[] cards = StringUtils.split(scard, ","); + ColumnDesc[] cdescs = rtableDesc.getColumns(); + for (int i = 0; i < cdescs.length; i++) { + ColumnDesc columnDesc = cdescs[i]; + if (cards.length > i) { + cardinality.put(columnDesc.getName(), Long.parseLong(cards[i])); + } else { + logger.error("The result cardinality is not identical with hive table metadata, cardinaly : " + scard + " column array length: " + cdescs.length); + break; } } - descs.add(rtableDesc); + rtableDesc.setCardinality(cardinality); } + descs.add(rtableDesc); } + return descs; } http://git-wip-us.apache.org/repos/asf/kylin/blob/40dca957/server-base/src/main/java/org/apache/kylin/rest/response/TableDescResponse.java ---------------------------------------------------------------------- diff --git a/server-base/src/main/java/org/apache/kylin/rest/response/TableDescResponse.java b/server-base/src/main/java/org/apache/kylin/rest/response/TableDescResponse.java index c3b1e7c..1bad096 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/response/TableDescResponse.java +++ b/server-base/src/main/java/org/apache/kylin/rest/response/TableDescResponse.java @@ -22,6 +22,7 @@ import java.util.HashMap; import java.util.Map; import org.apache.kylin.metadata.model.TableDesc; +import org.apache.kylin.metadata.model.TableExtDesc; import com.fasterxml.jackson.annotation.JsonProperty; @@ -33,7 +34,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; */ public class TableDescResponse extends TableDesc { @JsonProperty("exd") - Map<String, String> descExd = new HashMap<String, String>(); + TableExtDesc descExd; @JsonProperty("cardinality") Map<String, Long> cardinality = new HashMap<String, Long>(); @@ -55,7 +56,7 @@ public class TableDescResponse extends TableDesc { /** * @return the descExd */ - public Map<String, String> getDescExd() { + public TableExtDesc getDescExd() { return descExd; } @@ -63,7 +64,7 @@ public class TableDescResponse extends TableDesc { * @param descExd * the descExd to set */ - public void setDescExd(Map<String, String> descExd) { + public void setDescExd(TableExtDesc descExd) { this.descExd = descExd; } http://git-wip-us.apache.org/repos/asf/kylin/blob/40dca957/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java ---------------------------------------------------------------------- diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java b/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java index a6246f8..c95f0ca 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java +++ b/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java @@ -46,11 +46,12 @@ import org.apache.kylin.engine.mr.common.HadoopShellExecutable; import org.apache.kylin.engine.mr.common.MapReduceExecutable; import org.apache.kylin.job.exception.JobException; import org.apache.kylin.job.execution.DefaultChainedExecutable; +import org.apache.kylin.job.execution.ExecutableManager; import org.apache.kylin.job.execution.ExecutableState; -import org.apache.kylin.metadata.MetadataConstants; import org.apache.kylin.metadata.MetadataManager; import org.apache.kylin.metadata.model.SegmentStatusEnum; import org.apache.kylin.metadata.model.TableDesc; +import org.apache.kylin.metadata.model.TableExtDesc; import org.apache.kylin.metadata.project.ProjectInstance; import org.apache.kylin.metadata.project.ProjectManager; import org.apache.kylin.metadata.project.RealizationEntry; @@ -449,12 +450,12 @@ public class CubeService extends BasicService { * @param tableName */ @PreAuthorize(Constant.ACCESS_HAS_ROLE_MODELER + " or " + Constant.ACCESS_HAS_ROLE_ADMIN) - public void calculateCardinality(String tableName, String submitter) { + public void calculateCardinality(String tableName, String submitter) throws IOException { String[] dbTableName = HadoopUtil.parseHiveTableName(tableName); tableName = dbTableName[0] + "." + dbTableName[1]; TableDesc table = getMetadataManager().getTableDesc(tableName); - final Map<String, String> tableExd = getMetadataManager().getTableDescExd(tableName); - if (tableExd == null || table == null) { + final TableExtDesc tableExt = getMetadataManager().getTableExt(tableName); + if (table == null) { IllegalArgumentException e = new IllegalArgumentException("Cannot find table descirptor " + tableName); logger.error("Cannot find table descirptor " + tableName, e); throw e; @@ -483,6 +484,8 @@ public class CubeService extends BasicService { step2.setJobParams(param); step2.setParam("segmentId", tableName); job.addTask(step2); + tableExt.setJodID(job.getId()); + getMetadataManager().saveTableExt(tableExt); getExecutableManager().addJob(job); } @@ -576,9 +579,11 @@ public class CubeService extends BasicService { @PreAuthorize(Constant.ACCESS_HAS_ROLE_MODELER + " or " + Constant.ACCESS_HAS_ROLE_ADMIN) public void calculateCardinalityIfNotPresent(String[] tables, String submitter) throws IOException { MetadataManager metaMgr = getMetadataManager(); + ExecutableManager exeMgt = ExecutableManager.getInstance(getConfig()); for (String table : tables) { - Map<String, String> exdMap = metaMgr.getTableDescExd(table); - if (exdMap == null || !exdMap.containsKey(MetadataConstants.TABLE_EXD_CARDINALITY)) { + TableExtDesc tableExtDesc = metaMgr.getTableExt(table); + String jobID = tableExtDesc.getJodID(); + if (null == jobID || ExecutableState.RUNNING != exeMgt.getOutput(jobID).getState()) { calculateCardinality(table, submitter); } } http://git-wip-us.apache.org/repos/asf/kylin/blob/40dca957/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java ---------------------------------------------------------------------- diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java index a1aadab..7383c52 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java +++ b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java @@ -455,7 +455,12 @@ public class JobService extends BasicService implements InitializingBean { @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN + " or hasPermission(#job, 'ADMINISTRATION') or hasPermission(#job, 'OPERATION') or hasPermission(#job, 'MANAGEMENT')") public JobInstance cancelJob(JobInstance job) throws IOException, JobException { + if (null == job.getRelatedCube() || null == getCubeManager().getCube(job.getRelatedCube())) { + getExecutableManager().discardJob(job.getId()); + return job; + } CubeInstance cubeInstance = getCubeManager().getCube(job.getRelatedCube()); + // might not a cube job final String segmentIds = job.getRelatedSegment(); for (String segmentId : StringUtils.split(segmentIds)) { final CubeSegment segment = cubeInstance.getSegmentById(segmentId); http://git-wip-us.apache.org/repos/asf/kylin/blob/40dca957/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSourceTableLoader.java ---------------------------------------------------------------------- diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSourceTableLoader.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSourceTableLoader.java index 401e720..57292dc 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSourceTableLoader.java +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSourceTableLoader.java @@ -28,16 +28,15 @@ import java.util.UUID; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.engine.mr.HadoopUtil; -import org.apache.kylin.metadata.MetadataConstants; import org.apache.kylin.metadata.MetadataManager; import org.apache.kylin.metadata.model.ColumnDesc; import org.apache.kylin.metadata.model.TableDesc; +import org.apache.kylin.metadata.model.TableExtDesc; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.collect.LinkedHashMultimap; import com.google.common.collect.Lists; -import com.google.common.collect.Maps; import com.google.common.collect.SetMultimap; import com.google.common.collect.Sets; @@ -80,7 +79,7 @@ public class HiveSourceTableLoader { public static void unLoadHiveTable(String hiveTable) throws IOException { MetadataManager metaMgr = MetadataManager.getInstance(KylinConfig.getInstanceFromEnv()); metaMgr.removeSourceTable(hiveTable); - metaMgr.removeTableExd(hiveTable); + metaMgr.removeTableExt(hiveTable); } private static List<String> extractHiveTables(String database, Set<String> tables, IHiveClient hiveClient) throws IOException { @@ -132,27 +131,21 @@ public class HiveSourceTableLoader { partitionColumnString.append(hiveTableMeta.partitionColumns.get(i).name.toUpperCase()); } - Map<String, String> map = metaMgr.getTableDescExd(tableDesc.getIdentity()); - - if (map == null) { - map = Maps.newHashMap(); - } - map.put(MetadataConstants.TABLE_EXD_TABLENAME, hiveTableMeta.tableName); - map.put(MetadataConstants.TABLE_EXD_LOCATION, hiveTableMeta.sdLocation); - map.put(MetadataConstants.TABLE_EXD_IF, hiveTableMeta.sdInputFormat); - map.put(MetadataConstants.TABLE_EXD_OF, hiveTableMeta.sdOutputFormat); - map.put(MetadataConstants.TABLE_EXD_OWNER, hiveTableMeta.owner); - map.put(MetadataConstants.TABLE_EXD_LAT, String.valueOf(hiveTableMeta.lastAccessTime)); - map.put(MetadataConstants.TABLE_EXD_PC, partitionColumnString.toString()); - map.put(MetadataConstants.TABLE_EXD_TFS, String.valueOf(hiveTableMeta.fileSize)); - map.put(MetadataConstants.TABLE_EXD_TNF, String.valueOf(hiveTableMeta.fileNum)); - map.put(MetadataConstants.TABLE_EXD_PARTITIONED, Boolean.valueOf(hiveTableMeta.partitionColumns.size() > 0).toString()); - + TableExtDesc tableExtDesc = metaMgr.getTableExt(tableDesc.getIdentity()); + tableExtDesc.setStorageLocation(hiveTableMeta.sdLocation); + tableExtDesc.setOwner(hiveTableMeta.owner); + tableExtDesc.setLastAccessTime(String.valueOf(hiveTableMeta.lastAccessTime)); + tableExtDesc.setPartitionColumn(partitionColumnString.toString()); + tableExtDesc.setTotalFileSize(String.valueOf(hiveTableMeta.fileSize)); + tableExtDesc.addDataSourceProp("total_file_number", String.valueOf(hiveTableMeta.fileNum)); + tableExtDesc.addDataSourceProp("hive_inputFormat", hiveTableMeta.sdInputFormat); + tableExtDesc.addDataSourceProp("hive_outputFormat", hiveTableMeta.sdOutputFormat); + + metaMgr.saveTableExt(tableExtDesc); metaMgr.saveSourceTable(tableDesc); - metaMgr.saveTableExd(tableDesc.getIdentity(), map); + loadedTables.add(tableDesc.getIdentity()); } - return loadedTables; } http://git-wip-us.apache.org/repos/asf/kylin/blob/40dca957/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/HiveColumnCardinalityJob.java ---------------------------------------------------------------------- diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/HiveColumnCardinalityJob.java b/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/HiveColumnCardinalityJob.java index c7d694f..82c72ad 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/HiveColumnCardinalityJob.java +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/HiveColumnCardinalityJob.java @@ -83,6 +83,7 @@ public class HiveColumnCardinalityJob extends AbstractHadoopJob { Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH)); FileOutputFormat.setOutputPath(job, output); job.getConfiguration().set("dfs.block.size", "67108864"); + job.getConfiguration().set("mapreduce.output.fileoutputformat.compress", "false"); // Mapper IMRTableInputFormat tableInputFormat = MRUtil.getTableInputFormat(table); http://git-wip-us.apache.org/repos/asf/kylin/blob/40dca957/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/HiveColumnCardinalityUpdateJob.java ---------------------------------------------------------------------- diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/HiveColumnCardinalityUpdateJob.java b/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/HiveColumnCardinalityUpdateJob.java index 1997f7f..246822c 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/HiveColumnCardinalityUpdateJob.java +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/HiveColumnCardinalityUpdateJob.java @@ -24,7 +24,6 @@ import java.io.StringWriter; import java.util.ArrayList; import java.util.Iterator; import java.util.List; -import java.util.Map; import org.apache.commons.cli.Option; import org.apache.commons.cli.OptionBuilder; @@ -39,8 +38,8 @@ import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.CompressionCodecFactory; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.engine.mr.common.AbstractHadoopJob; -import org.apache.kylin.metadata.MetadataConstants; import org.apache.kylin.metadata.MetadataManager; +import org.apache.kylin.metadata.model.TableExtDesc; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -117,9 +116,9 @@ public class HiveColumnCardinalityUpdateJob extends AbstractHadoopJob { if (scardi.length() > 0) { scardi = scardi.substring(0, scardi.length() - 1); MetadataManager metaMgr = MetadataManager.getInstance(KylinConfig.getInstanceFromEnv()); - Map<String, String> tableExd = metaMgr.getTableDescExd(tableName); - tableExd.put(MetadataConstants.TABLE_EXD_CARDINALITY, scardi); - metaMgr.saveTableExd(tableName.toUpperCase(), tableExd); + TableExtDesc tableExt = metaMgr.getTableExt(tableName); + tableExt.setCardinality(scardi); + metaMgr.saveTableExt(tableExt); } else { throw new IllegalArgumentException("No cardinality data is collected for table " + tableName); }
