port KYLIN-2012 to new interface introduced in KYLIN-2125
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/cd2a06a5 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/cd2a06a5 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/cd2a06a5 Branch: refs/heads/v1.6.0-rc1-hbase1.x Commit: cd2a06a5d373bdd5cfa90e78649d42e891711c43 Parents: 553d7c5 Author: Hongbin Ma <mahong...@apache.org> Authored: Wed Oct 26 14:04:56 2016 +0800 Committer: Hongbin Ma <mahong...@apache.org> Committed: Thu Oct 27 08:34:11 2016 +0800 ---------------------------------------------------------------------- .../java/org/apache/kylin/job/DeployUtil.java | 5 ++- .../kylin/source/hive/BeelineHiveClient.java | 2 +- .../source/hive/HiveSourceTableLoader.java | 32 +++++++-------- .../apache/kylin/source/hive/SchemaChecker.java | 41 ++++++++------------ 4 files changed, 36 insertions(+), 44 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/cd2a06a5/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java ---------------------------------------------------------------------- diff --git a/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java b/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java index 4a24ad2..54feb24 100644 --- a/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java +++ b/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java @@ -45,8 +45,9 @@ import org.apache.kylin.metadata.model.ColumnDesc; import org.apache.kylin.metadata.model.TableDesc; import org.apache.kylin.metadata.model.TableRef; import org.apache.kylin.metadata.model.TblColRef; -import org.apache.kylin.source.hive.HiveClient; +import org.apache.kylin.source.hive.HiveClientFactory; import org.apache.kylin.source.hive.HiveCmdBuilder; +import org.apache.kylin.source.hive.IHiveClient; import org.apache.kylin.source.kafka.TimedJsonStreamParser; import org.apache.maven.model.Model; import org.apache.maven.model.io.xpp3.MavenXpp3Reader; @@ -234,7 +235,7 @@ public class DeployUtil { String tableFileDir = temp.getParent(); temp.delete(); - HiveClient hiveClient = new HiveClient(); + IHiveClient hiveClient = HiveClientFactory.getHiveClient(); // create hive tables hiveClient.executeHQL("CREATE DATABASE IF NOT EXISTS EDW"); hiveClient.executeHQL(generateCreateTableHql(metaMgr.getTableDesc(TABLE_CAL_DT.toUpperCase()))); http://git-wip-us.apache.org/repos/asf/kylin/blob/cd2a06a5/source-hive/src/main/java/org/apache/kylin/source/hive/BeelineHiveClient.java ---------------------------------------------------------------------- diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/BeelineHiveClient.java b/source-hive/src/main/java/org/apache/kylin/source/hive/BeelineHiveClient.java index c8d56a5..a84aeb1 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/BeelineHiveClient.java +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/BeelineHiveClient.java @@ -207,7 +207,7 @@ public class BeelineHiveClient implements IHiveClient { BeelineHiveClient loader = new BeelineHiveClient("-n root --hiveconf hive.security.authorization.sqlstd.confwhitelist.append='mapreduce.job.*|dfs.*' -u 'jdbc:hive2://sandbox:10000'"); //BeelineHiveClient loader = new BeelineHiveClient(StringUtils.join(args, " ")); - HiveTableMeta hiveTableMeta = loader.getHiveTableMeta("default", "test001"); + HiveTableMeta hiveTableMeta = loader.getHiveTableMeta("default", "test_kylin_fact_part"); System.out.println(hiveTableMeta); loader.close(); } http://git-wip-us.apache.org/repos/asf/kylin/blob/cd2a06a5/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSourceTableLoader.java ---------------------------------------------------------------------- diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSourceTableLoader.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSourceTableLoader.java index 388e72b..401e720 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSourceTableLoader.java +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSourceTableLoader.java @@ -26,6 +26,7 @@ import java.util.Set; import java.util.UUID; import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.cube.CubeManager; import org.apache.kylin.engine.mr.HadoopUtil; import org.apache.kylin.metadata.MetadataConstants; import org.apache.kylin.metadata.MetadataManager; @@ -34,8 +35,10 @@ import org.apache.kylin.metadata.model.TableDesc; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.collect.LinkedHashMultimap; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.google.common.collect.SetMultimap; import com.google.common.collect.Sets; /** @@ -49,27 +52,25 @@ public class HiveSourceTableLoader { @SuppressWarnings("unused") private static final Logger logger = LoggerFactory.getLogger(HiveSourceTableLoader.class); - public static final String OUTPUT_SURFIX = "json"; - public static final String TABLE_FOLDER_NAME = "table"; - public static final String TABLE_EXD_FOLDER_NAME = "table_exd"; - public static Set<String> reloadHiveTables(String[] hiveTables, KylinConfig config) throws IOException { - Map<String, Set<String>> db2tables = Maps.newHashMap(); - for (String table : hiveTables) { - String[] parts = HadoopUtil.parseHiveTableName(table); - Set<String> set = db2tables.get(parts[0]); - if (set == null) { - set = Sets.newHashSet(); - db2tables.put(parts[0], set); - } - set.add(parts[1]); + SetMultimap<String, String> db2tables = LinkedHashMultimap.create(); + for (String fullTableName : hiveTables) { + String[] parts = HadoopUtil.parseHiveTableName(fullTableName); + db2tables.put(parts[0], parts[1]); + } + + IHiveClient hiveClient = HiveClientFactory.getHiveClient(); + SchemaChecker checker = new SchemaChecker(hiveClient, MetadataManager.getInstance(config), CubeManager.getInstance(config)); + for (Map.Entry<String, String> entry : db2tables.entries()) { + SchemaChecker.CheckResult result = checker.allowReload(entry.getKey(), entry.getValue()); + result.raiseExceptionWhenInvalid(); } // extract from hive Set<String> loadedTables = Sets.newHashSet(); for (String database : db2tables.keySet()) { - List<String> loaded = extractHiveTables(database, db2tables.get(database), config); + List<String> loaded = extractHiveTables(database, db2tables.get(database), hiveClient); loadedTables.addAll(loaded); } @@ -82,12 +83,11 @@ public class HiveSourceTableLoader { metaMgr.removeTableExd(hiveTable); } - private static List<String> extractHiveTables(String database, Set<String> tables, KylinConfig config) throws IOException { + private static List<String> extractHiveTables(String database, Set<String> tables, IHiveClient hiveClient) throws IOException { List<String> loadedTables = Lists.newArrayList(); MetadataManager metaMgr = MetadataManager.getInstance(KylinConfig.getInstanceFromEnv()); for (String tableName : tables) { - IHiveClient hiveClient = HiveClientFactory.getHiveClient(); HiveTableMeta hiveTableMeta; try { hiveTableMeta = hiveClient.getHiveTableMeta(database, tableName); http://git-wip-us.apache.org/repos/asf/kylin/blob/cd2a06a5/source-hive/src/main/java/org/apache/kylin/source/hive/SchemaChecker.java ---------------------------------------------------------------------- diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/SchemaChecker.java b/source-hive/src/main/java/org/apache/kylin/source/hive/SchemaChecker.java index 319ebee..87a8870 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/SchemaChecker.java +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/SchemaChecker.java @@ -27,8 +27,6 @@ import java.util.Set; import javax.annotation.Nullable; -import org.apache.hadoop.hive.metastore.api.FieldSchema; -import org.apache.hadoop.hive.metastore.api.Table; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.model.CubeDesc; @@ -46,7 +44,7 @@ import com.google.common.collect.Maps; import com.google.common.collect.Sets; public class SchemaChecker { - private final HiveClient hiveClient; + private final IHiveClient hiveClient; private final MetadataManager metadataManager; private final CubeManager cubeManager; @@ -87,23 +85,16 @@ public class SchemaChecker { } } - SchemaChecker(HiveClient hiveClient, MetadataManager metadataManager, CubeManager cubeManager) { + SchemaChecker(IHiveClient hiveClient, MetadataManager metadataManager, CubeManager cubeManager) { this.hiveClient = checkNotNull(hiveClient, "hiveClient is null"); this.metadataManager = checkNotNull(metadataManager, "metadataManager is null"); this.cubeManager = checkNotNull(cubeManager, "cubeManager is null"); } - private List<FieldSchema> fetchSchema(String dbName, String tblName) throws Exception { - List<FieldSchema> fields = Lists.newArrayList(); - fields.addAll(hiveClient.getHiveTableFields(dbName, tblName)); - - Table table = hiveClient.getHiveTable(dbName, tblName); - List<FieldSchema> partitionFields = table.getPartitionKeys(); - if (partitionFields != null) { - fields.addAll(partitionFields); - } - - return fields; + private List<HiveTableMeta.HiveTableColumnMeta> fetchSchema(String dbName, String tblName) throws Exception { + List<HiveTableMeta.HiveTableColumnMeta> columnMetas = Lists.newArrayList(); + columnMetas.addAll(hiveClient.getHiveTableMeta(dbName, tblName).allColumns); + return columnMetas; } private List<CubeInstance> findCubeByTable(final String fullTableName) { @@ -128,12 +119,12 @@ public class SchemaChecker { return ImmutableList.copyOf(relatedCubes); } - private boolean isColumnCompatible(ColumnDesc column, FieldSchema field) { - if (!column.getName().equalsIgnoreCase(field.getName())) { + private boolean isColumnCompatible(ColumnDesc column, HiveTableMeta.HiveTableColumnMeta field) { + if (!column.getName().equalsIgnoreCase(field.name)) { return false; } - String typeStr = field.getType(); + String typeStr = field.dataType; // kylin uses double internally for float, see HiveSourceTableLoader.java // TODO should this normalization to be in DataType class ? if ("float".equalsIgnoreCase(typeStr)) { @@ -159,7 +150,7 @@ public class SchemaChecker { * @param fieldsMap current hive schema of `table` * @return true if all columns used in `cube` has compatible schema with `fieldsMap`, false otherwise */ - private List<String> checkAllColumnsInCube(CubeInstance cube, TableDesc table, Map<String, FieldSchema> fieldsMap) { + private List<String> checkAllColumnsInCube(CubeInstance cube, TableDesc table, Map<String, HiveTableMeta.HiveTableColumnMeta> fieldsMap) { Set<ColumnDesc> usedColumns = Sets.newHashSet(); for (TblColRef col : cube.getAllColumns()) { usedColumns.add(col.getColumnDesc()); @@ -168,7 +159,7 @@ public class SchemaChecker { List<String> violateColumns = Lists.newArrayList(); for (ColumnDesc column : table.getColumns()) { if (usedColumns.contains(column)) { - FieldSchema field = fieldsMap.get(column.getName()); + HiveTableMeta.HiveTableColumnMeta field = fieldsMap.get(column.getName()); if (field == null || !isColumnCompatible(column, field)) { violateColumns.add(column.getName()); } @@ -184,7 +175,7 @@ public class SchemaChecker { * @param fields current table metadata in hive * @return true if only new columns are appended in hive, false otherwise */ - private boolean checkAllColumnsInTableDesc(TableDesc table, List<FieldSchema> fields) { + private boolean checkAllColumnsInTableDesc(TableDesc table, List<HiveTableMeta.HiveTableColumnMeta> fields) { if (table.getColumnCount() > fields.size()) { return false; } @@ -206,15 +197,15 @@ public class SchemaChecker { return CheckResult.validOnFirstLoad(fullTableName); } - List<FieldSchema> currentFields; - Map<String, FieldSchema> currentFieldsMap = Maps.newHashMap(); + List<HiveTableMeta.HiveTableColumnMeta> currentFields; + Map<String, HiveTableMeta.HiveTableColumnMeta> currentFieldsMap = Maps.newHashMap(); try { currentFields = fetchSchema(dbName, tblName); } catch (Exception e) { return CheckResult.invalidOnFetchSchema(fullTableName, e); } - for (FieldSchema field : currentFields) { - currentFieldsMap.put(field.getName().toUpperCase(), field); + for (HiveTableMeta.HiveTableColumnMeta field : currentFields) { + currentFieldsMap.put(field.name.toUpperCase(), field); } List<String> issues = Lists.newArrayList();