This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/master by this push:
     new 3f3f114  check derived column for stream table
3f3f114 is described below

commit 3f3f114400e49cf8bd9ec48c888b253c81994f79
Author: liukun4515 <liu...@apache.org>
AuthorDate: Thu Mar 4 18:40:02 2021 +0800

    check derived column for stream table
---
 .../kylin/rest/controller/StreamingV2Controller.java    | 17 ++++++++---------
 1 file changed, 8 insertions(+), 9 deletions(-)

diff --git 
a/server-base/src/main/java/org/apache/kylin/rest/controller/StreamingV2Controller.java
 
b/server-base/src/main/java/org/apache/kylin/rest/controller/StreamingV2Controller.java
index 1b5a81e..0a0cc62 100644
--- 
a/server-base/src/main/java/org/apache/kylin/rest/controller/StreamingV2Controller.java
+++ 
b/server-base/src/main/java/org/apache/kylin/rest/controller/StreamingV2Controller.java
@@ -34,7 +34,6 @@ import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.HadoopUtil;
 import org.apache.kylin.common.util.JsonUtil;
 import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.dimension.TimeDerivedColumnType;
 import org.apache.kylin.metadata.datatype.DataType;
 import org.apache.kylin.metadata.model.ColumnDesc;
 import org.apache.kylin.metadata.model.ISourceAware;
@@ -196,10 +195,12 @@ public class StreamingV2Controller extends 
BasicController {
         // validate the compatibility for input table schema and the underline 
hive table schema
         if (tableDesc.getSourceType() == ISourceAware.ID_KAFKA_HIVE) {
             List<FieldSchema> fields;
+            List<FieldSchema> partitionFields;
             String db = tableDesc.getDatabase();
             try {
                 HiveMetaStoreClient metaStoreClient = new 
HiveMetaStoreClient(new HiveConf());
                 fields = metaStoreClient.getFields(db, tableDesc.getName());
+                partitionFields = metaStoreClient.getTable(db, 
tableDesc.getName()).getPartitionKeys();
                 logger.info("Checking the {} in {}", tableDesc.getName(), db);
             } catch (NoSuchObjectException noObjectException) {
                 logger.info("table not exist in hive meta store for table:" + 
tableDesc.getIdentity(),
@@ -216,18 +217,16 @@ public class StreamingV2Controller extends 
BasicController {
             for (FieldSchema field : fields) {
                 fieldSchemaMap.put(field.getName().toUpperCase(Locale.ROOT), 
field);
             }
+            // partition column
+            for (FieldSchema field : partitionFields) {
+                fieldSchemaMap.put(field.getName().toUpperCase(Locale.ROOT), 
field);
+            }
             List<String> incompatibleMsgs = Lists.newArrayList();
             for (ColumnDesc columnDesc : tableDesc.getColumns()) {
                 FieldSchema fieldSchema = 
fieldSchemaMap.get(columnDesc.getName().toUpperCase(Locale.ROOT));
                 if (fieldSchema == null) {
-                    // Partition column cannot be fetched via Hive Metadata 
API.
-                    if 
(!TimeDerivedColumnType.isTimeDerivedColumn(columnDesc.getName())) {
-                        incompatibleMsgs.add("Column not exist in hive table:" 
+ columnDesc.getName());
-                        continue;
-                    } else {
-                        logger.info("Column not exist in hive table: {}.", 
columnDesc.getName());
-                        continue;
-                    }
+                    incompatibleMsgs.add("Column not exist in hive table:" + 
columnDesc.getName());
+                    continue;
                 }
                 if (!checkHiveTableFieldCompatible(fieldSchema, columnDesc)) {
                     String msg = String.format(Locale.ROOT,

Reply via email to