This is an automated email from the ASF dual-hosted git repository. shaofengshi pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/master by this push: new 7519629 KYLIN-3835 [Defective TableSchemaUpdateChecker] Don't check used models when reload table 7519629 is described below commit 7519629d04c3b46c6f97acaaf7eddb5b8ceabe1e Author: yuzhang <shifengdefan...@163.com> AuthorDate: Thu Feb 28 12:47:14 2019 +0800 KYLIN-3835 [Defective TableSchemaUpdateChecker] Don't check used models when reload table --- .../kylin/metadata/model/DataModelManager.java | 2 +- .../rest/service/TableSchemaUpdateChecker.java | 160 +++++++++++++++++---- .../apache/kylin/rest/service/TableService.java | 2 +- 3 files changed, 136 insertions(+), 28 deletions(-) diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/DataModelManager.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/DataModelManager.java index 7f9fc5e..c1ffbf7 100644 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/DataModelManager.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/DataModelManager.java @@ -192,7 +192,7 @@ public class DataModelManager { } // within a project, find models that use the specified table - public List<String> getModelsUsingTable(TableDesc table, String project) throws IOException { + public List<String> getModelsUsingTable(TableDesc table, String project) { try (AutoLock lock = modelMapLock.lockForRead()) { List<String> models = new ArrayList<>(); for (DataModelDesc modelDesc : getModels(project)) { diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/TableSchemaUpdateChecker.java b/server-base/src/main/java/org/apache/kylin/rest/service/TableSchemaUpdateChecker.java index 89a505a..46fa5ae 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/service/TableSchemaUpdateChecker.java +++ b/server-base/src/main/java/org/apache/kylin/rest/service/TableSchemaUpdateChecker.java @@ -14,7 +14,7 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. -*/ + */ package org.apache.kylin.rest.service; @@ -27,6 +27,7 @@ import java.util.Set; import javax.annotation.Nullable; +import com.google.common.base.Preconditions; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.metadata.TableMetadataManager; @@ -34,6 +35,8 @@ import org.apache.kylin.metadata.model.ColumnDesc; import org.apache.kylin.metadata.model.DataModelDesc; import org.apache.kylin.metadata.model.TableDesc; import org.apache.kylin.metadata.model.TblColRef; +import org.apache.kylin.metadata.model.DataModelManager; +import org.apache.kylin.metadata.model.ModelDimensionDesc; import com.google.common.base.Predicate; import com.google.common.collect.ImmutableList; @@ -44,6 +47,7 @@ import com.google.common.collect.Sets; public class TableSchemaUpdateChecker { private final TableMetadataManager metadataManager; private final CubeManager cubeManager; + private final DataModelManager dataModelManager; static class CheckResult { private final boolean valid; @@ -87,9 +91,10 @@ public class TableSchemaUpdateChecker { } } - TableSchemaUpdateChecker(TableMetadataManager metadataManager, CubeManager cubeManager) { + TableSchemaUpdateChecker(TableMetadataManager metadataManager, CubeManager cubeManager, DataModelManager dataModelManager) { this.metadataManager = checkNotNull(metadataManager, "metadataManager is null"); this.cubeManager = checkNotNull(cubeManager, "cubeManager is null"); + this.dataModelManager = checkNotNull(dataModelManager, "dataModelManager is null"); } private List<CubeInstance> findCubeByTable(final TableDesc table) { @@ -133,8 +138,8 @@ public class TableSchemaUpdateChecker { * check whether all columns used in `cube` has compatible schema in current hive schema denoted by `fieldsMap`. * @param cube cube to check, must use `table` in its model * @param origTable kylin's table metadata - * @param fieldsMap current hive schema of `table` - * @return true if all columns used in `cube` has compatible schema with `fieldsMap`, false otherwise + * @param newTable current hive schema of `table` + * @return columns in origTable that can't be found in newTable */ private List<String> checkAllColumnsInCube(CubeInstance cube, TableDesc origTable, TableDesc newTable) { Set<ColumnDesc> usedColumns = Sets.newHashSet(); @@ -157,8 +162,8 @@ public class TableSchemaUpdateChecker { /** * check whether all columns in `table` are still in `fields` and have the same index as before. * - * @param table kylin's table metadata - * @param fields current table metadata in hive + * @param origTable kylin's table metadata + * @param newTable current table metadata in hive * @return true if only new columns are appended in hive, false otherwise */ private boolean checkAllColumnsInTableDesc(TableDesc origTable, TableDesc newTable) { @@ -182,35 +187,138 @@ public class TableSchemaUpdateChecker { if (existing == null) { return CheckResult.validOnFirstLoad(fullTableName); } - List<String> issues = Lists.newArrayList(); + + for (DataModelDesc usedModel : findModelByTable(newTableDesc, prj)){ + checkValidationInModel(newTableDesc, issues, usedModel); + } + for (CubeInstance cube : findCubeByTable(newTableDesc)) { - String modelName = cube.getModel().getName(); - - // if user reloads a fact table used by cube, then all used columns must match current schema - if (cube.getModel().isFactTable(fullTableName)) { - TableDesc factTable = cube.getModel().findFirstTable(fullTableName).getTableDesc(); - List<String> violateColumns = checkAllColumnsInCube(cube, factTable, newTableDesc); - if (!violateColumns.isEmpty()) { - issues.add(format(Locale.ROOT, "Column %s used in cube[%s] and model[%s], but changed " + "in hive", - violateColumns, cube.getName(), modelName)); + checkValidationInCube(newTableDesc, issues, cube); + } + + if (issues.isEmpty()) { + return CheckResult.validOnCompatibleSchema(fullTableName); + } + return CheckResult.invalidOnIncompatibleSchema(fullTableName, issues); + } + + private Iterable<? extends DataModelDesc> findModelByTable(TableDesc newTableDesc, String prj) { + List<DataModelDesc> usedModels = Lists.newArrayList(); + List<String> modelNames = dataModelManager.getModelsUsingTable(newTableDesc, prj); + modelNames.stream() + .map(mn -> dataModelManager.getDataModelDesc(mn)) + .filter(m -> null != m) + .forEach(m -> usedModels.add(m)); + + return usedModels; + } + + private void checkValidationInCube(TableDesc newTableDesc, List<String> issues, CubeInstance cube) { + final String fullTableName = newTableDesc.getIdentity(); + String modelName = cube.getModel().getName(); + // if user reloads a fact table used by cube, then all used columns must match current schema + if (cube.getModel().isFactTable(fullTableName)) { + TableDesc factTable = cube.getModel().findFirstTable(fullTableName).getTableDesc(); + List<String> violateColumns = checkAllColumnsInCube(cube, factTable, newTableDesc); + if (!violateColumns.isEmpty()) { + issues.add(format(Locale.ROOT, "Column %s used in cube[%s] and model[%s], but changed " + "in hive", + violateColumns, cube.getName(), modelName)); + } + } + + // if user reloads a lookup table used by cube, only append column(s) are allowed, all existing columns + // must be the same (except compatible type changes) + if (cube.getModel().isLookupTable(fullTableName)) { + TableDesc lookupTable = cube.getModel().findFirstTable(fullTableName).getTableDesc(); + if (!checkAllColumnsInTableDesc(lookupTable, newTableDesc)) { + issues.add(format(Locale.ROOT, "Table '%s' is used as Lookup Table in cube[%s] and model[%s], but " + + "changed in " + "hive, only append operation are supported on hive table as lookup table", + lookupTable.getIdentity(), cube.getName(), modelName)); + } + } + } + + private void checkValidationInModel(TableDesc newTableDesc, List<String> issues, DataModelDesc usedModel){ + final String fullTableName = newTableDesc.getIdentity(); + // if user reloads a fact table used by model, then all used columns must match current schema + if (usedModel.isFactTable(fullTableName)) { + TableDesc factTable = usedModel.findFirstTable(fullTableName).getTableDesc(); + List<String> violateColumns = checkAllColumnsInFactTable(usedModel, factTable, newTableDesc); + if (!violateColumns.isEmpty()) { + issues.add(format(Locale.ROOT, "Column %s used in model[%s], but changed " + "in hive", + violateColumns, usedModel.getName())); + } + } + + // if user reloads a lookup table used by cube, only append column(s) are allowed, all existing columns + // must be the same (except compatible type changes) + if (usedModel.isLookupTable(fullTableName)) { + TableDesc lookupTable = usedModel.findFirstTable(fullTableName).getTableDesc(); + if (!checkAllColumnsInTableDesc(lookupTable, newTableDesc)) { + issues.add(format(Locale.ROOT, "Table '%s' is used as Lookup Table in model[%s], but " + + "changed in " + "hive, only append operation are supported on hive table as lookup table", + lookupTable.getIdentity(), usedModel.getName())); + } + } + } + + private List<String> checkAllColumnsInFactTable(DataModelDesc usedModel, TableDesc factTable, TableDesc newTableDesc) { + List<String> violateColumns = Lists.newArrayList(); + + for (ColumnDesc column : findUsedColumnsInFactTable(usedModel, factTable)) { + if (!column.isComputedColumn()) { + ColumnDesc newCol = newTableDesc.findColumnByName(column.getName()); + if (newCol == null || !isColumnCompatible(column, newCol)) { + violateColumns.add(column.getName()); } } + } + return violateColumns; + } - // if user reloads a lookup table used by cube, only append column(s) are allowed, all existing columns - // must be the same (except compatible type changes) - if (cube.getModel().isLookupTable(fullTableName)) { - TableDesc lookupTable = cube.getModel().findFirstTable(fullTableName).getTableDesc(); - if (!checkAllColumnsInTableDesc(lookupTable, newTableDesc)) { - issues.add(format(Locale.ROOT, "Table '%s' is used as Lookup Table in cube[%s] and model[%s], but " - + "changed in " + "hive", lookupTable.getIdentity(), cube.getName(), modelName)); + // get table name from column full name + private String getTableName(String columnName) { + int lastIndexOfDot = columnName.lastIndexOf('.'); + String tableName = null; + if (lastIndexOfDot >= 0) { + tableName = columnName.substring(0, lastIndexOfDot); + } else { + return null; + } + // maybe contain db name + lastIndexOfDot = tableName.lastIndexOf('.'); + if (lastIndexOfDot >= 0) { + tableName = tableName.substring(lastIndexOfDot + 1); + } + return tableName; + } + + private ColumnDesc mustGetColumnDesc(TableDesc factTable, String columnName) { + ColumnDesc columnDesc = factTable.findColumnByName(columnName); + Preconditions.checkNotNull(columnDesc, + format(Locale.ROOT, "Can't find column %s in current fact table %s.", columnName, factTable.getIdentity())); + return columnDesc; + } + + private Set<ColumnDesc> findUsedColumnsInFactTable(DataModelDesc usedModel, TableDesc factTable) { + Set<ColumnDesc> usedColumns = Sets.newHashSet(); + // column in dimension + for (ModelDimensionDesc dim : usedModel.getDimensions()) { + if (dim.getTable().equalsIgnoreCase(factTable.getName())) { + for (String col : dim.getColumns()) { + usedColumns.add(mustGetColumnDesc(factTable, col)); } } } - if (issues.isEmpty()) { - return CheckResult.validOnCompatibleSchema(fullTableName); + // column in measure + for (String columnInMeasure : usedModel.getMetrics()) { + if (factTable.getName().equalsIgnoreCase(getTableName(columnInMeasure))) { + usedColumns.add(mustGetColumnDesc(factTable, columnInMeasure)); + } } - return CheckResult.invalidOnIncompatibleSchema(fullTableName, issues); + + return usedColumns; } } diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java b/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java index e692193..f5c6d2d 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java +++ b/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java @@ -146,7 +146,7 @@ public class TableService extends BasicService { // do schema check TableMetadataManager metaMgr = getTableManager(); CubeManager cubeMgr = getCubeManager(); - TableSchemaUpdateChecker checker = new TableSchemaUpdateChecker(metaMgr, cubeMgr); + TableSchemaUpdateChecker checker = new TableSchemaUpdateChecker(metaMgr, cubeMgr, getDataModelManager()); for (Pair<TableDesc, TableExtDesc> pair : allMeta) { TableDesc tableDesc = pair.getFirst(); TableSchemaUpdateChecker.CheckResult result = checker.allowReload(tableDesc, project);