http://git-wip-us.apache.org/repos/asf/kylin/blob/73a78dbe/server-base/src/main/java/org/apache/kylin/rest/service/QueryServiceV2.java ---------------------------------------------------------------------- diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/QueryServiceV2.java b/server-base/src/main/java/org/apache/kylin/rest/service/QueryServiceV2.java new file mode 100644 index 0000000..3699998 --- /dev/null +++ b/server-base/src/main/java/org/apache/kylin/rest/service/QueryServiceV2.java @@ -0,0 +1,516 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.rest.service; + +import static org.apache.kylin.common.util.CheckUtil.checkCondition; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.DatabaseMetaData; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; +import java.sql.Statement; +import java.sql.Time; +import java.sql.Timestamp; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +import javax.sql.DataSource; + +import org.apache.calcite.avatica.ColumnMetaData; +import org.apache.commons.lang.StringUtils; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.QueryContext; +import org.apache.kylin.common.debug.BackdoorToggles; +import org.apache.kylin.common.exceptions.ResourceLimitExceededException; +import org.apache.kylin.common.util.SetThreadName; +import org.apache.kylin.cube.CubeManager; +import org.apache.kylin.metadata.model.DataModelDesc; +import org.apache.kylin.metadata.model.JoinDesc; +import org.apache.kylin.metadata.model.JoinTableDesc; +import org.apache.kylin.metadata.model.ModelDimensionDesc; +import org.apache.kylin.metadata.model.TableRef; +import org.apache.kylin.metadata.project.ProjectInstance; +import org.apache.kylin.query.relnode.OLAPContext; +import org.apache.kylin.query.util.QueryUtil; +import org.apache.kylin.rest.constant.Constant; +import org.apache.kylin.rest.exception.BadRequestException; +import org.apache.kylin.rest.exception.InternalErrorException; +import org.apache.kylin.rest.metrics.QueryMetricsFacade; +import org.apache.kylin.rest.model.ColumnMetaWithType; +import org.apache.kylin.rest.model.SelectedColumnMeta; +import org.apache.kylin.rest.model.TableMetaWithType; +import org.apache.kylin.rest.msg.Message; +import org.apache.kylin.rest.msg.MsgPicker; +import org.apache.kylin.rest.request.PrepareSqlRequest; +import org.apache.kylin.rest.request.SQLRequest; +import org.apache.kylin.rest.response.SQLResponse; +import org.apache.kylin.rest.util.TableauInterceptor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.security.core.GrantedAuthority; +import org.springframework.security.core.context.SecurityContextHolder; +import org.springframework.stereotype.Component; + +import com.google.common.collect.Lists; + +import net.sf.ehcache.Cache; +import net.sf.ehcache.Element; + +/** + * Created by luwei on 17-4-24. + */ +@Component("queryServiceV2") +public class QueryServiceV2 extends QueryService { + private static final Logger logger = LoggerFactory.getLogger(QueryServiceV2.class); + + @Autowired + @Qualifier("cacheService") + private CacheService cacheService; + + @Autowired + @Qualifier("modelMgmtServiceV2") + private ModelServiceV2 modelServiceV2; + + public SQLResponse doQueryWithCache(SQLRequest sqlRequest) { + Message msg = MsgPicker.getMsg(); + + KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); + String serverMode = kylinConfig.getServerMode(); + if (!(Constant.SERVER_MODE_QUERY.equals(serverMode.toLowerCase()) || Constant.SERVER_MODE_ALL.equals(serverMode.toLowerCase()))) { + throw new BadRequestException(String.format(msg.getQUERY_NOT_ALLOWED(), serverMode)); + } + if (StringUtils.isBlank(sqlRequest.getProject())) { + throw new BadRequestException(msg.getEMPTY_PROJECT_NAME()); + } + + if (sqlRequest.getBackdoorToggles() != null) + BackdoorToggles.addToggles(sqlRequest.getBackdoorToggles()); + + final QueryContext queryContext = QueryContext.current(); + + try (SetThreadName ignored = new SetThreadName("Query %s", queryContext.getQueryId())) { + String sql = sqlRequest.getSql(); + String project = sqlRequest.getProject(); + logger.info("Using project: " + project); + logger.info("The original query: " + sql); + + if (!sql.toLowerCase().contains("select")) { + logger.debug("Directly return exception as not supported"); + throw new BadRequestException(msg.getNOT_SUPPORTED_SQL()); + } + + long startTime = System.currentTimeMillis(); + + SQLResponse sqlResponse = null; + boolean queryCacheEnabled = checkCondition(kylinConfig.isQueryCacheEnabled(), "query cache disabled in KylinConfig") && // + checkCondition(!BackdoorToggles.getDisableCache(), "query cache disabled in BackdoorToggles"); + + if (queryCacheEnabled) { + sqlResponse = searchQueryInCache(sqlRequest); + } + + try { + if (null == sqlResponse) { + sqlResponse = query(sqlRequest); + + long durationThreshold = kylinConfig.getQueryDurationCacheThreshold(); + long scanCountThreshold = kylinConfig.getQueryScanCountCacheThreshold(); + long scanBytesThreshold = kylinConfig.getQueryScanBytesCacheThreshold(); + sqlResponse.setDuration(System.currentTimeMillis() - startTime); + logger.info("Stats of SQL response: isException: {}, duration: {}, total scan count {}", // + String.valueOf(sqlResponse.getIsException()), String.valueOf(sqlResponse.getDuration()), String.valueOf(sqlResponse.getTotalScanCount())); + if (checkCondition(queryCacheEnabled, "query cache is disabled") // + && checkCondition(!sqlResponse.getIsException(), "query has exception") // + && checkCondition(sqlResponse.getDuration() > durationThreshold || sqlResponse.getTotalScanCount() > scanCountThreshold || sqlResponse.getTotalScanBytes() > scanBytesThreshold, // + "query is too lightweight with duration: {} (threshold {}), scan count: {} (threshold {}), scan bytes: {} (threshold {})", sqlResponse.getDuration(), durationThreshold, sqlResponse.getTotalScanCount(), scanCountThreshold, sqlResponse.getTotalScanBytes(), scanBytesThreshold) + && checkCondition(sqlResponse.getResults().size() < kylinConfig.getLargeQueryThreshold(), "query response is too large: {} ({})", sqlResponse.getResults().size(), kylinConfig.getLargeQueryThreshold())) { + cacheManager.getCache(SUCCESS_QUERY_CACHE).put(new Element(sqlRequest, sqlResponse)); + } + + } else { + sqlResponse.setDuration(System.currentTimeMillis() - startTime); + sqlResponse.setTotalScanCount(0); + sqlResponse.setTotalScanBytes(0); + } + + checkQueryAuth(sqlResponse); + + } catch (Throwable e) { // calcite may throw AssertError + logger.error("Exception when execute sql", e); + String errMsg = QueryUtil.makeErrorMsgUserFriendly(e); + + sqlResponse = new SQLResponse(null, null, 0, true, errMsg); + sqlResponse.setTotalScanCount(queryContext.getScannedRows()); + sqlResponse.setTotalScanBytes(queryContext.getScannedBytes()); + + if (queryCacheEnabled && e.getCause() != null && e.getCause() instanceof ResourceLimitExceededException) { + Cache exceptionCache = cacheManager.getCache(EXCEPTION_QUERY_CACHE); + exceptionCache.put(new Element(sqlRequest, sqlResponse)); + } + } + + logQuery(sqlRequest, sqlResponse); + + QueryMetricsFacade.updateMetrics(sqlRequest, sqlResponse); + + if (sqlResponse.getIsException()) + throw new InternalErrorException(sqlResponse.getExceptionMessage()); + + return sqlResponse; + + } finally { + BackdoorToggles.cleanToggles(); + QueryContext.reset(); + } + } + + public SQLResponse query(SQLRequest sqlRequest) throws Exception { + try { + final String user = SecurityContextHolder.getContext().getAuthentication().getName(); + badQueryDetector.queryStart(Thread.currentThread(), sqlRequest, user); + + return queryWithSqlMassage(sqlRequest); + + } finally { + badQueryDetector.queryEnd(Thread.currentThread()); + } + } + + private SQLResponse queryWithSqlMassage(SQLRequest sqlRequest) throws Exception { + String userInfo = SecurityContextHolder.getContext().getAuthentication().getName(); + final Collection<? extends GrantedAuthority> grantedAuthorities = SecurityContextHolder.getContext().getAuthentication().getAuthorities(); + for (GrantedAuthority grantedAuthority : grantedAuthorities) { + userInfo += ","; + userInfo += grantedAuthority.getAuthority(); + } + + SQLResponse fakeResponse = TableauInterceptor.tableauIntercept(sqlRequest.getSql()); + if (null != fakeResponse) { + logger.debug("Return fake response, is exception? " + fakeResponse.getIsException()); + return fakeResponse; + } + + String correctedSql = QueryUtil.massageSql(sqlRequest.getSql(), sqlRequest.getLimit(), sqlRequest.getOffset()); + if (!correctedSql.equals(sqlRequest.getSql())) { + logger.info("The corrected query: " + correctedSql); + + //CAUTION: should not change sqlRequest content! + //sqlRequest.setSql(correctedSql); + } + + // add extra parameters into olap context, like acceptPartial + Map<String, String> parameters = new HashMap<String, String>(); + parameters.put(OLAPContext.PRM_USER_AUTHEN_INFO, userInfo); + parameters.put(OLAPContext.PRM_ACCEPT_PARTIAL_RESULT, String.valueOf(sqlRequest.isAcceptPartial())); + OLAPContext.setParameters(parameters); + // force clear the query context before a new query + OLAPContext.clearThreadLocalContexts(); + + return execute(correctedSql, sqlRequest); + + } + + /** + * @param correctedSql + * @param sqlRequest + * @return + * @throws Exception + */ + private SQLResponse execute(String correctedSql, SQLRequest sqlRequest) throws Exception { + Connection conn = null; + Statement stat = null; + ResultSet resultSet = null; + + List<List<String>> results = Lists.newArrayList(); + List<SelectedColumnMeta> columnMetas = Lists.newArrayList(); + + try { + conn = cacheService.getOLAPDataSource(sqlRequest.getProject()).getConnection(); + + if (sqlRequest instanceof PrepareSqlRequest) { + PreparedStatement preparedState = conn.prepareStatement(correctedSql); + processStatementAttr(preparedState, sqlRequest); + + for (int i = 0; i < ((PrepareSqlRequest) sqlRequest).getParams().length; i++) { + setParam(preparedState, i + 1, ((PrepareSqlRequest) sqlRequest).getParams()[i]); + } + + resultSet = preparedState.executeQuery(); + } else { + stat = conn.createStatement(); + processStatementAttr(stat, sqlRequest); + resultSet = stat.executeQuery(correctedSql); + } + + ResultSetMetaData metaData = resultSet.getMetaData(); + int columnCount = metaData.getColumnCount(); + + // Fill in selected column meta + for (int i = 1; i <= columnCount; ++i) { + columnMetas.add(new SelectedColumnMeta(metaData.isAutoIncrement(i), metaData.isCaseSensitive(i), metaData.isSearchable(i), metaData.isCurrency(i), metaData.isNullable(i), metaData.isSigned(i), metaData.getColumnDisplaySize(i), metaData.getColumnLabel(i), metaData.getColumnName(i), metaData.getSchemaName(i), metaData.getCatalogName(i), metaData.getTableName(i), metaData.getPrecision(i), metaData.getScale(i), metaData.getColumnType(i), metaData.getColumnTypeName(i), metaData.isReadOnly(i), metaData.isWritable(i), metaData.isDefinitelyWritable(i))); + } + + // fill in results + while (resultSet.next()) { + List<String> oneRow = Lists.newArrayListWithCapacity(columnCount); + for (int i = 0; i < columnCount; i++) { + oneRow.add((resultSet.getString(i + 1))); + } + + results.add(oneRow); + } + } finally { + close(resultSet, stat, conn); + } + + boolean isPartialResult = false; + String cube = ""; + StringBuilder sb = new StringBuilder("Processed rows for each storageContext: "); + if (OLAPContext.getThreadLocalContexts() != null) { // contexts can be null in case of 'explain plan for' + for (OLAPContext ctx : OLAPContext.getThreadLocalContexts()) { + if (ctx.realization != null) { + isPartialResult |= ctx.storageContext.isPartialResultReturned(); + cube = ctx.realization.getName(); + sb.append(ctx.storageContext.getProcessedRowCount()).append(" "); + } + } + } + logger.info(sb.toString()); + + SQLResponse response = new SQLResponse(columnMetas, results, cube, 0, false, null, isPartialResult); + response.setTotalScanCount(QueryContext.current().getScannedRows()); + response.setTotalScanBytes(QueryContext.current().getScannedBytes()); + + return response; + } + + /** + * @param preparedState + * @param param + * @throws SQLException + */ + private void setParam(PreparedStatement preparedState, int index, PrepareSqlRequest.StateParam param) throws SQLException { + boolean isNull = (null == param.getValue()); + + Class<?> clazz; + try { + clazz = Class.forName(param.getClassName()); + } catch (ClassNotFoundException e) { + throw new InternalErrorException(e); + } + + ColumnMetaData.Rep rep = ColumnMetaData.Rep.of(clazz); + + switch (rep) { + case PRIMITIVE_CHAR: + case CHARACTER: + case STRING: + preparedState.setString(index, isNull ? null : String.valueOf(param.getValue())); + break; + case PRIMITIVE_INT: + case INTEGER: + preparedState.setInt(index, isNull ? 0 : Integer.valueOf(param.getValue())); + break; + case PRIMITIVE_SHORT: + case SHORT: + preparedState.setShort(index, isNull ? 0 : Short.valueOf(param.getValue())); + break; + case PRIMITIVE_LONG: + case LONG: + preparedState.setLong(index, isNull ? 0 : Long.valueOf(param.getValue())); + break; + case PRIMITIVE_FLOAT: + case FLOAT: + preparedState.setFloat(index, isNull ? 0 : Float.valueOf(param.getValue())); + break; + case PRIMITIVE_DOUBLE: + case DOUBLE: + preparedState.setDouble(index, isNull ? 0 : Double.valueOf(param.getValue())); + break; + case PRIMITIVE_BOOLEAN: + case BOOLEAN: + preparedState.setBoolean(index, !isNull && Boolean.parseBoolean(param.getValue())); + break; + case PRIMITIVE_BYTE: + case BYTE: + preparedState.setByte(index, isNull ? 0 : Byte.valueOf(param.getValue())); + break; + case JAVA_UTIL_DATE: + case JAVA_SQL_DATE: + preparedState.setDate(index, isNull ? null : java.sql.Date.valueOf(param.getValue())); + break; + case JAVA_SQL_TIME: + preparedState.setTime(index, isNull ? null : Time.valueOf(param.getValue())); + break; + case JAVA_SQL_TIMESTAMP: + preparedState.setTimestamp(index, isNull ? null : Timestamp.valueOf(param.getValue())); + break; + default: + preparedState.setObject(index, isNull ? null : param.getValue()); + } + } + + public List<TableMetaWithType> getMetadataV2(String project) throws SQLException, IOException { + return getMetadataV2(getCubeManager(), project, true); + } + + protected List<TableMetaWithType> getMetadataV2(CubeManager cubeMgr, String project, boolean cubedOnly) throws SQLException, IOException { + //Message msg = MsgPicker.getMsg(); + + Connection conn = null; + ResultSet columnMeta = null; + List<TableMetaWithType> tableMetas = null; + Map<String, TableMetaWithType> tableMap = null; + Map<String, ColumnMetaWithType> columnMap = null; + if (StringUtils.isBlank(project)) { + return Collections.emptyList(); + } + ResultSet JDBCTableMeta = null; + try { + DataSource dataSource = cacheService.getOLAPDataSource(project); + conn = dataSource.getConnection(); + DatabaseMetaData metaData = conn.getMetaData(); + + JDBCTableMeta = metaData.getTables(null, null, null, null); + + tableMetas = new LinkedList<TableMetaWithType>(); + tableMap = new HashMap<String, TableMetaWithType>(); + columnMap = new HashMap<String, ColumnMetaWithType>(); + while (JDBCTableMeta.next()) { + String catalogName = JDBCTableMeta.getString(1); + String schemaName = JDBCTableMeta.getString(2); + + // Not every JDBC data provider offers full 10 columns, e.g., PostgreSQL has only 5 + TableMetaWithType tblMeta = new TableMetaWithType(catalogName == null ? Constant.FakeCatalogName : catalogName, schemaName == null ? Constant.FakeSchemaName : schemaName, JDBCTableMeta.getString(3), JDBCTableMeta.getString(4), JDBCTableMeta.getString(5), null, null, null, null, null); + + if (!cubedOnly || getProjectManager().isExposedTable(project, schemaName + "." + tblMeta.getTABLE_NAME())) { + tableMetas.add(tblMeta); + tableMap.put(tblMeta.getTABLE_SCHEM() + "#" + tblMeta.getTABLE_NAME(), tblMeta); + } + } + + columnMeta = metaData.getColumns(null, null, null, null); + + while (columnMeta.next()) { + String catalogName = columnMeta.getString(1); + String schemaName = columnMeta.getString(2); + + // kylin(optiq) is not strictly following JDBC specification + ColumnMetaWithType colmnMeta = new ColumnMetaWithType(catalogName == null ? Constant.FakeCatalogName : catalogName, schemaName == null ? Constant.FakeSchemaName : schemaName, columnMeta.getString(3), columnMeta.getString(4), columnMeta.getInt(5), columnMeta.getString(6), columnMeta.getInt(7), getInt(columnMeta.getString(8)), columnMeta.getInt(9), columnMeta.getInt(10), columnMeta.getInt(11), columnMeta.getString(12), columnMeta.getString(13), getInt(columnMeta.getString(14)), getInt(columnMeta.getString(15)), columnMeta.getInt(16), columnMeta.getInt(17), columnMeta.getString(18), columnMeta.getString(19), columnMeta.getString(20), columnMeta.getString(21), getShort(columnMeta.getString(22)), columnMeta.getString(23)); + + if (!cubedOnly || getProjectManager().isExposedColumn(project, schemaName + "." + colmnMeta.getTABLE_NAME(), colmnMeta.getCOLUMN_NAME())) { + tableMap.get(colmnMeta.getTABLE_SCHEM() + "#" + colmnMeta.getTABLE_NAME()).addColumn(colmnMeta); + columnMap.put(colmnMeta.getTABLE_SCHEM() + "#" + colmnMeta.getTABLE_NAME() + "#" + colmnMeta.getCOLUMN_NAME(), colmnMeta); + } + } + + } finally { + close(columnMeta, null, conn); + if (JDBCTableMeta != null) { + JDBCTableMeta.close(); + } + } + + ProjectInstance projectInstance = getProjectManager().getProject(project); + for (String modelName : projectInstance.getModels()) { + DataModelDesc dataModelDesc = modelServiceV2.listAllModels(modelName, project).get(0); + if (dataModelDesc.getStatus() == null) { + + // update table type: FACT + for (TableRef factTable : dataModelDesc.getFactTables()) { + String factTableName = factTable.getTableIdentity().replace('.', '#'); + if (tableMap.containsKey(factTableName)) { + tableMap.get(factTableName).getTYPE().add(TableMetaWithType.tableTypeEnum.FACT); + } else { + // should be used after JDBC exposes all tables and columns + // throw new BadRequestException(msg.getTABLE_META_INCONSISTENT()); + } + } + + // update table type: LOOKUP + for (TableRef lookupTable : dataModelDesc.getLookupTables()) { + String lookupTableName = lookupTable.getTableIdentity().replace('.', '#'); + if (tableMap.containsKey(lookupTableName)) { + tableMap.get(lookupTableName).getTYPE().add(TableMetaWithType.tableTypeEnum.LOOKUP); + } else { + // throw new BadRequestException(msg.getTABLE_META_INCONSISTENT()); + } + } + + // update column type: PK and FK + for (JoinTableDesc joinTableDesc : dataModelDesc.getJoinTables()) { + JoinDesc joinDesc = joinTableDesc.getJoin(); + for (String pk : joinDesc.getPrimaryKey()) { + String columnIdentity = (dataModelDesc.findTable(pk.substring(0, pk.indexOf("."))).getTableIdentity() + pk.substring(pk.indexOf("."))).replace('.', '#'); + if (columnMap.containsKey(columnIdentity)) { + columnMap.get(columnIdentity).getTYPE().add(ColumnMetaWithType.columnTypeEnum.PK); + } else { + // throw new BadRequestException(msg.getCOLUMN_META_INCONSISTENT()); + } + } + + for (String fk : joinDesc.getForeignKey()) { + String columnIdentity = (dataModelDesc.findTable(fk.substring(0, fk.indexOf("."))).getTableIdentity() + fk.substring(fk.indexOf("."))).replace('.', '#'); + if (columnMap.containsKey(columnIdentity)) { + columnMap.get(columnIdentity).getTYPE().add(ColumnMetaWithType.columnTypeEnum.FK); + } else { + // throw new BadRequestException(msg.getCOLUMN_META_INCONSISTENT()); + } + } + } + + // update column type: DIMENSION AND MEASURE + List<ModelDimensionDesc> dimensions = dataModelDesc.getDimensions(); + for (ModelDimensionDesc dimension : dimensions) { + for (String column : dimension.getColumns()) { + String columnIdentity = (dataModelDesc.findTable(dimension.getTable()).getTableIdentity() + "." + column).replace('.', '#'); + if (columnMap.containsKey(columnIdentity)) { + columnMap.get(columnIdentity).getTYPE().add(ColumnMetaWithType.columnTypeEnum.DIMENSION); + } else { + // throw new BadRequestException(msg.getCOLUMN_META_INCONSISTENT()); + } + + } + } + + String[] measures = dataModelDesc.getMetrics(); + for (String measure : measures) { + String columnIdentity = (dataModelDesc.findTable(measure.substring(0, measure.indexOf("."))).getTableIdentity() + measure.substring(measure.indexOf("."))).replace('.', '#'); + if (columnMap.containsKey(columnIdentity)) { + columnMap.get(columnIdentity).getTYPE().add(ColumnMetaWithType.columnTypeEnum.MEASURE); + } else { + // throw new BadRequestException(msg.getCOLUMN_META_INCONSISTENT()); + } + } + } + } + + return tableMetas; + } + +}
http://git-wip-us.apache.org/repos/asf/kylin/blob/73a78dbe/server-base/src/main/java/org/apache/kylin/rest/service/StreamingService.java ---------------------------------------------------------------------- diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/StreamingService.java b/server-base/src/main/java/org/apache/kylin/rest/service/StreamingService.java index 6f473e2..adae67c 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/service/StreamingService.java +++ b/server-base/src/main/java/org/apache/kylin/rest/service/StreamingService.java @@ -25,7 +25,9 @@ import java.util.List; import org.apache.commons.lang3.StringUtils; import org.apache.kylin.metadata.streaming.StreamingConfig; import org.apache.kylin.rest.constant.Constant; -import org.apache.kylin.rest.exception.InternalErrorException; +import org.apache.kylin.rest.exception.BadRequestException; +import org.apache.kylin.rest.msg.Message; +import org.apache.kylin.rest.msg.MsgPicker; import org.springframework.security.access.prepost.PostFilter; import org.springframework.stereotype.Component; @@ -64,8 +66,10 @@ public class StreamingService extends BasicService { } public StreamingConfig createStreamingConfig(StreamingConfig config) throws IOException { + Message msg = MsgPicker.getMsg(); + if (getStreamingManager().getStreamingConfig(config.getName()) != null) { - throw new InternalErrorException("The streamingConfig named " + config.getName() + " already exists"); + throw new BadRequestException(String.format(msg.getSTREAMING_CONFIG_ALREADY_EXIST(), config.getName())); } StreamingConfig streamingConfig = getStreamingManager().saveStreamingConfig(config); return streamingConfig; http://git-wip-us.apache.org/repos/asf/kylin/blob/73a78dbe/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java ---------------------------------------------------------------------- diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java b/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java index d4cb854..9f9b541 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java +++ b/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java @@ -53,6 +53,7 @@ import org.apache.kylin.source.kafka.config.KafkaConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.security.access.prepost.PreAuthorize; import org.springframework.stereotype.Component; @@ -62,15 +63,19 @@ public class TableService extends BasicService { private static final Logger logger = LoggerFactory.getLogger(TableService.class); @Autowired + @Qualifier("modelMgmtService") private ModelService modelService; @Autowired + @Qualifier("projectService") private ProjectService projectService; @Autowired + @Qualifier("streamingMgmtService") private StreamingService streamingService; @Autowired + @Qualifier("kafkaMgmtService") private KafkaConfigService kafkaConfigService; public List<TableDesc> getTableDescByProject(String project, boolean withExt) throws IOException { @@ -100,7 +105,7 @@ public class TableService extends BasicService { return result; } - private void unLoadHiveTable(String tableName) throws IOException { + protected void unLoadHiveTable(String tableName) throws IOException { tableName = normalizeHiveTableName(tableName); MetadataManager metaMgr = MetadataManager.getInstance(KylinConfig.getInstanceFromEnv()); metaMgr.removeSourceTable(tableName); @@ -111,7 +116,7 @@ public class TableService extends BasicService { getProjectManager().addTableDescToProject(tables, project); } - private void removeTableFromProject(String tableName, String projectName) throws IOException { + protected void removeTableFromProject(String tableName, String projectName) throws IOException { tableName = normalizeHiveTableName(tableName); getProjectManager().removeTableDescFromProject(tableName, projectName); } http://git-wip-us.apache.org/repos/asf/kylin/blob/73a78dbe/server-base/src/main/java/org/apache/kylin/rest/service/TableServiceV2.java ---------------------------------------------------------------------- diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/TableServiceV2.java b/server-base/src/main/java/org/apache/kylin/rest/service/TableServiceV2.java new file mode 100644 index 0000000..5c3eeb3 --- /dev/null +++ b/server-base/src/main/java/org/apache/kylin/rest/service/TableServiceV2.java @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.rest.service; + +import java.io.IOException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.kylin.engine.mr.common.HadoopShellExecutable; +import org.apache.kylin.engine.mr.common.MapReduceExecutable; +import org.apache.kylin.job.execution.DefaultChainedExecutable; +import org.apache.kylin.job.execution.ExecutableManager; +import org.apache.kylin.job.execution.ExecutableState; +import org.apache.kylin.metadata.MetadataManager; +import org.apache.kylin.metadata.model.TableDesc; +import org.apache.kylin.metadata.model.TableExtDesc; +import org.apache.kylin.metadata.streaming.StreamingConfig; +import org.apache.kylin.rest.constant.Constant; +import org.apache.kylin.rest.exception.BadRequestException; +import org.apache.kylin.rest.msg.Message; +import org.apache.kylin.rest.msg.MsgPicker; +import org.apache.kylin.source.hive.cardinality.HiveColumnCardinalityJob; +import org.apache.kylin.source.hive.cardinality.HiveColumnCardinalityUpdateJob; +import org.apache.kylin.source.kafka.config.KafkaConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.security.access.prepost.PreAuthorize; +import org.springframework.security.core.context.SecurityContextHolder; +import org.springframework.stereotype.Component; + +import com.google.common.collect.Sets; + +/** + * Created by luwei on 17-4-24. + */ +@Component("tableServiceV2") +public class TableServiceV2 extends TableService { + + private static final Logger logger = LoggerFactory.getLogger(TableServiceV2.class); + + @Autowired + @Qualifier("modelMgmtServiceV2") + private ModelServiceV2 modelServiceV2; + + @Autowired + @Qualifier("projectServiceV2") + private ProjectServiceV2 projectServiceV2; + + @Autowired + @Qualifier("streamingMgmtService") + private StreamingService streamingService; + + @Autowired + @Qualifier("kafkaMgmtService") + private KafkaConfigService kafkaConfigService; + + @PreAuthorize(Constant.ACCESS_HAS_ROLE_MODELER + " or " + Constant.ACCESS_HAS_ROLE_ADMIN) + public void calculateCardinalityIfNotPresent(String[] tables, String submitter) throws IOException { + MetadataManager metaMgr = getMetadataManager(); + ExecutableManager exeMgt = ExecutableManager.getInstance(getConfig()); + for (String table : tables) { + TableExtDesc tableExtDesc = metaMgr.getTableExt(table); + String jobID = tableExtDesc.getJodID(); + if (null == jobID || ExecutableState.RUNNING != exeMgt.getOutput(jobID).getState()) { + calculateCardinality(table, submitter); + } + } + } + + /** + * Generate cardinality for table This will trigger a hadoop job + * The result will be merged into table exd info + * + * @param tableName + */ + @PreAuthorize(Constant.ACCESS_HAS_ROLE_MODELER + " or " + Constant.ACCESS_HAS_ROLE_ADMIN) + public void calculateCardinality(String tableName, String submitter) throws IOException { + Message msg = MsgPicker.getMsg(); + + tableName = normalizeHiveTableName(tableName); + TableDesc table = getMetadataManager().getTableDesc(tableName); + final TableExtDesc tableExt = getMetadataManager().getTableExt(tableName); + if (table == null) { + BadRequestException e = new BadRequestException(String.format(msg.getTABLE_DESC_NOT_FOUND(), tableName)); + logger.error("Cannot find table descriptor " + tableName, e); + throw e; + } + + DefaultChainedExecutable job = new DefaultChainedExecutable(); + //make sure the job could be scheduled when the DistributedScheduler is enable. + job.setParam("segmentId", tableName); + job.setName("Hive Column Cardinality calculation for table '" + tableName + "'"); + job.setSubmitter(submitter); + + String outPath = getConfig().getHdfsWorkingDirectory() + "cardinality/" + job.getId() + "/" + tableName; + String param = "-table " + tableName + " -output " + outPath; + + MapReduceExecutable step1 = new MapReduceExecutable(); + + step1.setMapReduceJobClass(HiveColumnCardinalityJob.class); + step1.setMapReduceParams(param); + step1.setParam("segmentId", tableName); + + job.addTask(step1); + + HadoopShellExecutable step2 = new HadoopShellExecutable(); + + step2.setJobClass(HiveColumnCardinalityUpdateJob.class); + step2.setJobParams(param); + step2.setParam("segmentId", tableName); + job.addTask(step2); + tableExt.setJodID(job.getId()); + getMetadataManager().saveTableExt(tableExt); + + getExecutableManager().addJob(job); + } + + /** + * table may referenced by several projects, and kylin only keep one copy of meta for each table, + * that's why we have two if statement here. + * @param tableName + * @param project + * @return + */ + @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN) + public boolean unLoadHiveTableV2(String tableName, String project) throws IOException { + Message msg = MsgPicker.getMsg(); + + boolean rtn = false; + int tableType = 0; + + //remove streaming info + tableName = normalizeHiveTableName(tableName); + TableDesc desc = getMetadataManager().getTableDesc(tableName); + if (desc == null) + return false; + tableType = desc.getSourceType(); + + if (!modelServiceV2.isTableInModel(tableName, project)) { + removeTableFromProject(tableName, project); + rtn = true; + } else { + List<String> models = modelServiceV2.getModelsUsingTable(tableName, project); + throw new BadRequestException(String.format(msg.getTABLE_IN_USE_BY_MODEL(), models)); + } + + if (!projectServiceV2.isTableInAnyProject(tableName) && !modelServiceV2.isTableInAnyModel(tableName)) { + unLoadHiveTable(tableName); + rtn = true; + } + + if (tableType == 1 && !projectServiceV2.isTableInAnyProject(tableName) && !modelServiceV2.isTableInAnyModel(tableName)) { + StreamingConfig config = null; + KafkaConfig kafkaConfig = null; + try { + config = streamingService.getStreamingManager().getStreamingConfig(tableName); + kafkaConfig = kafkaConfigService.getKafkaConfig(tableName); + streamingService.dropStreamingConfig(config); + kafkaConfigService.dropKafkaConfig(kafkaConfig); + rtn = true; + } catch (Exception e) { + rtn = false; + logger.error(e.getLocalizedMessage(), e); + } + } + return rtn; + } + + public Map<String, String[]> loadHiveTables(String[] tableNames, String project, boolean isNeedProfile) throws IOException { + String submitter = SecurityContextHolder.getContext().getAuthentication().getName(); + Map<String, String[]> result = new HashMap<String, String[]>(); + + String[] loaded = loadHiveTablesToProject(tableNames, project); + result.put("result.loaded", loaded); + Set<String> allTables = new HashSet<String>(); + for (String tableName : tableNames) { + allTables.add(normalizeHiveTableName(tableName)); + } + for (String loadedTableName : loaded) { + allTables.remove(loadedTableName); + } + String[] unloaded = new String[allTables.size()]; + allTables.toArray(unloaded); + result.put("result.unloaded", unloaded); + if (isNeedProfile) { + calculateCardinalityIfNotPresent(loaded, submitter); + } + return result; + } + + public Map<String, String[]> unloadHiveTables(String[] tableNames, String project) throws IOException { + Set<String> unLoadSuccess = Sets.newHashSet(); + Set<String> unLoadFail = Sets.newHashSet(); + Map<String, String[]> result = new HashMap<String, String[]>(); + + for (String tableName : tableNames) { + if (unLoadHiveTableV2(tableName, project)) { + unLoadSuccess.add(tableName); + } else { + unLoadFail.add(tableName); + } + } + + result.put("result.unload.success", (String[]) unLoadSuccess.toArray(new String[unLoadSuccess.size()])); + result.put("result.unload.fail", (String[]) unLoadFail.toArray(new String[unLoadFail.size()])); + return result; + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/73a78dbe/server-base/src/main/java/org/apache/kylin/rest/service/UserService.java ---------------------------------------------------------------------- diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/UserService.java b/server-base/src/main/java/org/apache/kylin/rest/service/UserService.java index 2d8e006..f37d447 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/service/UserService.java +++ b/server-base/src/main/java/org/apache/kylin/rest/service/UserService.java @@ -31,6 +31,9 @@ import org.apache.kylin.common.persistence.ResourceStore; import org.apache.kylin.common.persistence.Serializer; import org.apache.kylin.common.util.JsonUtil; import org.apache.kylin.rest.constant.Constant; +import org.apache.kylin.rest.exception.InternalErrorException; +import org.apache.kylin.rest.msg.Message; +import org.apache.kylin.rest.msg.MsgPicker; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.security.access.prepost.PreAuthorize; @@ -55,7 +58,6 @@ public class UserService implements UserDetailsManager { @PostConstruct public void init() throws IOException { aclStore = ResourceStore.getStore(KylinConfig.getInstanceFromEnv()); - logger.debug("UserService init"); } @Override @@ -73,7 +75,7 @@ public class UserService implements UserDetailsManager { aclStore.putResource(id, new UserInfo(user), 0, UserInfoSerializer.getInstance()); logger.debug("update user : {}", user.getUsername()); } catch (IOException e) { - throw new RuntimeException(e); + throw new InternalErrorException(e); } } @@ -84,7 +86,7 @@ public class UserService implements UserDetailsManager { aclStore.deleteResource(id); logger.debug("delete user : {}", userName); } catch (IOException e) { - throw new RuntimeException(e); + throw new InternalErrorException(e); } } @@ -99,25 +101,26 @@ public class UserService implements UserDetailsManager { logger.debug("judge user exist: {}", userName); return aclStore.exists(getId(userName)); } catch (IOException e) { - throw new RuntimeException(e); + throw new InternalErrorException(e); } } @Override public UserDetails loadUserByUsername(String userName) throws UsernameNotFoundException { + Message msg = MsgPicker.getMsg(); try { UserInfo userInfo = aclStore.getResource(getId(userName), UserInfo.class, UserInfoSerializer.getInstance()); if (userInfo == null) { - throw new UsernameNotFoundException("User:" + userName + " Not found"); + throw new UsernameNotFoundException(String.format(msg.getUSER_NOT_FOUND(), userName)); } logger.debug("load user : {}", userName); return wrap(userInfo); } catch (IOException e) { - throw new RuntimeException(e); + throw new InternalErrorException(e); } } - public List<String> listUserAuthorities() { + public List<String> listUserAuthorities() throws IOException { List<String> all = new ArrayList<String>(); for (UserDetails user : listUsers()) { for (GrantedAuthority auth : user.getAuthorities()) { @@ -129,15 +132,11 @@ public class UserService implements UserDetailsManager { return all; } - public List<UserDetails> listUsers() { + public List<UserDetails> listUsers() throws IOException { List<UserDetails> all = new ArrayList<UserDetails>(); - try { - List<UserInfo> userInfos = aclStore.getAllResources(DIR_PREFIX, UserInfo.class, UserInfoSerializer.getInstance()); - for (UserInfo info : userInfos) { - all.add(wrap(info)); - } - } catch (IOException e) { - throw new RuntimeException("Failed to list users", e); + List<UserInfo> userInfos = aclStore.getAllResources(DIR_PREFIX, UserInfo.class, UserInfoSerializer.getInstance()); + for (UserInfo info : userInfos) { + all.add(wrap(info)); } return all; } @@ -146,7 +145,7 @@ public class UserService implements UserDetailsManager { return DIR_PREFIX + userName; } - private User wrap(UserInfo userInfo) { + protected User wrap(UserInfo userInfo) { if (userInfo == null) return null; List<GrantedAuthority> authorities = new ArrayList<>(); http://git-wip-us.apache.org/repos/asf/kylin/blob/73a78dbe/server-base/src/main/java/org/apache/kylin/rest/util/ControllerSplitter.java ---------------------------------------------------------------------- diff --git a/server-base/src/main/java/org/apache/kylin/rest/util/ControllerSplitter.java b/server-base/src/main/java/org/apache/kylin/rest/util/ControllerSplitter.java new file mode 100644 index 0000000..e043327 --- /dev/null +++ b/server-base/src/main/java/org/apache/kylin/rest/util/ControllerSplitter.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.rest.util; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.io.IOUtils; + +public class ControllerSplitter { + + static File v1dir = new File("src/main/java/org/apache/kylin/rest/controller"); + static File v2dir = new File("src/main/java/org/apache/kylin/rest/controller2"); + static boolean dryRun = false; + + public static void main(String[] args) throws IOException { + + for (File f : v1dir.listFiles()) { + chopOff(f, "application/vnd.apache.kylin-v2+json"); + } + + for (File f : v2dir.listFiles()) { + chopOff(f, "application/json"); + } + } + + private static void chopOff(File f, String annoPtn) throws IOException { + + System.out.println("Processing " + f); + + FileInputStream is = new FileInputStream(f); + List<String> lines = IOUtils.readLines(is, "UTF-8"); + is.close(); + List<String> outLines = new ArrayList<>(lines.size()); + + boolean del = false; + for (String l : lines) { + if (l.startsWith(" @") && l.contains(annoPtn)) + del = true; + + if (del) + System.out.println("x " + l); + else + outLines.add(l); + + if (del && l.startsWith(" }")) + del = false; + } + + if (!dryRun && outLines.size() < lines.size()) { + FileOutputStream os = new FileOutputStream(f); + IOUtils.writeLines(outLines, "\n", os, "UTF-8"); + os.close(); + System.out.println("UPDATED " + f); + } else { + System.out.println("skipped"); + } + + System.out.println("============================================================================"); + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/73a78dbe/server/src/main/resources/applicationContext.xml ---------------------------------------------------------------------- diff --git a/server/src/main/resources/applicationContext.xml b/server/src/main/resources/applicationContext.xml index 081dc53..100b202 100644 --- a/server/src/main/resources/applicationContext.xml +++ b/server/src/main/resources/applicationContext.xml @@ -49,7 +49,7 @@ <!-- Rest service binding --> <bean - class="org.springframework.web.servlet.mvc.annotation.DefaultAnnotationHandlerMapping"/> + class="org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerMapping"/> <bean id="mappingJacksonHttpMessageConverter" class="org.springframework.http.converter.json.MappingJackson2HttpMessageConverter"/> @@ -59,7 +59,7 @@ class="org.springframework.http.converter.FormHttpMessageConverter"/> <bean - class="org.springframework.web.servlet.mvc.annotation.AnnotationMethodHandlerAdapter"> + class="org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter"> <property name="messageConverters"> <list> <ref bean="mappingJacksonHttpMessageConverter"/> http://git-wip-us.apache.org/repos/asf/kylin/blob/73a78dbe/server/src/test/java/org/apache/kylin/rest/controller/AccessControllerTest.java ---------------------------------------------------------------------- diff --git a/server/src/test/java/org/apache/kylin/rest/controller/AccessControllerTest.java b/server/src/test/java/org/apache/kylin/rest/controller/AccessControllerTest.java index fea98cd..18fbd06 100644 --- a/server/src/test/java/org/apache/kylin/rest/controller/AccessControllerTest.java +++ b/server/src/test/java/org/apache/kylin/rest/controller/AccessControllerTest.java @@ -32,6 +32,7 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.security.access.AccessDeniedException; import org.springframework.security.authentication.TestingAuthenticationToken; import org.springframework.security.core.Authentication; @@ -62,12 +63,15 @@ public class AccessControllerTest extends ServiceTestBase implements AclEntityTy private String ADMIN = "ADMIN"; @Autowired + @Qualifier("projectService") ProjectService projectService; @Autowired + @Qualifier("cubeMgmtService") CubeService cubeService; @Autowired + @Qualifier("accessService") AccessService accessService; @Before http://git-wip-us.apache.org/repos/asf/kylin/blob/73a78dbe/server/src/test/java/org/apache/kylin/rest/controller/AdminControllerTest.java ---------------------------------------------------------------------- diff --git a/server/src/test/java/org/apache/kylin/rest/controller/AdminControllerTest.java b/server/src/test/java/org/apache/kylin/rest/controller/AdminControllerTest.java index e6b6dc0..823e68a 100644 --- a/server/src/test/java/org/apache/kylin/rest/controller/AdminControllerTest.java +++ b/server/src/test/java/org/apache/kylin/rest/controller/AdminControllerTest.java @@ -27,6 +27,7 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; /** * @author xduo @@ -36,8 +37,11 @@ public class AdminControllerTest extends ServiceTestBase { private AdminController adminController; @Autowired + @Qualifier("adminService") private AdminService adminService; + @Autowired + @Qualifier("cubeMgmtService") private CubeService cubeService; @Before http://git-wip-us.apache.org/repos/asf/kylin/blob/73a78dbe/server/src/test/java/org/apache/kylin/rest/controller/CacheControllerTest.java ---------------------------------------------------------------------- diff --git a/server/src/test/java/org/apache/kylin/rest/controller/CacheControllerTest.java b/server/src/test/java/org/apache/kylin/rest/controller/CacheControllerTest.java index cda0d61..c2e21cc 100644 --- a/server/src/test/java/org/apache/kylin/rest/controller/CacheControllerTest.java +++ b/server/src/test/java/org/apache/kylin/rest/controller/CacheControllerTest.java @@ -25,6 +25,7 @@ import org.apache.kylin.rest.service.ServiceTestBase; import org.junit.Before; import org.junit.Test; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; /** * @author shaoshi @@ -34,6 +35,7 @@ public class CacheControllerTest extends ServiceTestBase { private CacheController cacheController; @Autowired + @Qualifier("cacheService") private CacheService cacheService; @Before http://git-wip-us.apache.org/repos/asf/kylin/blob/73a78dbe/server/src/test/java/org/apache/kylin/rest/controller/CubeControllerTest.java ---------------------------------------------------------------------- diff --git a/server/src/test/java/org/apache/kylin/rest/controller/CubeControllerTest.java b/server/src/test/java/org/apache/kylin/rest/controller/CubeControllerTest.java index 5c010ef..e67c238 100644 --- a/server/src/test/java/org/apache/kylin/rest/controller/CubeControllerTest.java +++ b/server/src/test/java/org/apache/kylin/rest/controller/CubeControllerTest.java @@ -38,6 +38,7 @@ import org.springframework.beans.factory.annotation.Autowired; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.Lists; +import org.springframework.beans.factory.annotation.Qualifier; /** * @author xduo @@ -48,10 +49,15 @@ public class CubeControllerTest extends ServiceTestBase { private CubeDescController cubeDescController; @Autowired + @Qualifier("cubeMgmtService") CubeService cubeService; + @Autowired + @Qualifier("jobService") JobService jobService; + @Autowired + @Qualifier("streamingMgmtService") StreamingService streamingService; @Before http://git-wip-us.apache.org/repos/asf/kylin/blob/73a78dbe/server/src/test/java/org/apache/kylin/rest/controller/JobControllerTest.java ---------------------------------------------------------------------- diff --git a/server/src/test/java/org/apache/kylin/rest/controller/JobControllerTest.java b/server/src/test/java/org/apache/kylin/rest/controller/JobControllerTest.java index 3cd994b..e87cecb 100644 --- a/server/src/test/java/org/apache/kylin/rest/controller/JobControllerTest.java +++ b/server/src/test/java/org/apache/kylin/rest/controller/JobControllerTest.java @@ -41,6 +41,7 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; /** * @author xduo @@ -50,9 +51,11 @@ public class JobControllerTest extends ServiceTestBase { private JobController jobSchedulerController; private CubeController cubeController; @Autowired + @Qualifier("jobService") JobService jobService; @Autowired + @Qualifier("cubeMgmtService") CubeService cubeService; private static final String CUBE_NAME = "new_job_controller"; http://git-wip-us.apache.org/repos/asf/kylin/blob/73a78dbe/server/src/test/java/org/apache/kylin/rest/controller/ProjectControllerTest.java ---------------------------------------------------------------------- diff --git a/server/src/test/java/org/apache/kylin/rest/controller/ProjectControllerTest.java b/server/src/test/java/org/apache/kylin/rest/controller/ProjectControllerTest.java index cd9a524..f805095 100644 --- a/server/src/test/java/org/apache/kylin/rest/controller/ProjectControllerTest.java +++ b/server/src/test/java/org/apache/kylin/rest/controller/ProjectControllerTest.java @@ -33,6 +33,7 @@ import org.junit.Test; import org.springframework.beans.factory.annotation.Autowired; import com.fasterxml.jackson.databind.ObjectMapper; +import org.springframework.beans.factory.annotation.Qualifier; /** */ @@ -41,6 +42,7 @@ public class ProjectControllerTest extends ServiceTestBase { private ProjectController projectController; @Autowired + @Qualifier("projectService") ProjectService projectService; @Before http://git-wip-us.apache.org/repos/asf/kylin/blob/73a78dbe/server/src/test/java/org/apache/kylin/rest/controller/QueryControllerTest.java ---------------------------------------------------------------------- diff --git a/server/src/test/java/org/apache/kylin/rest/controller/QueryControllerTest.java b/server/src/test/java/org/apache/kylin/rest/controller/QueryControllerTest.java index 2043fef..d9eb3fa 100644 --- a/server/src/test/java/org/apache/kylin/rest/controller/QueryControllerTest.java +++ b/server/src/test/java/org/apache/kylin/rest/controller/QueryControllerTest.java @@ -31,6 +31,7 @@ import org.junit.Test; import org.springframework.beans.factory.annotation.Autowired; import net.sf.ehcache.CacheManager; +import org.springframework.beans.factory.annotation.Qualifier; /** * @author xduo @@ -39,7 +40,9 @@ public class QueryControllerTest extends ServiceTestBase { private QueryController queryController; @Autowired + @Qualifier("queryService") QueryService queryService; + @Autowired private CacheManager cacheManager; http://git-wip-us.apache.org/repos/asf/kylin/blob/73a78dbe/server/src/test/java/org/apache/kylin/rest/service/AccessServiceTest.java ---------------------------------------------------------------------- diff --git a/server/src/test/java/org/apache/kylin/rest/service/AccessServiceTest.java b/server/src/test/java/org/apache/kylin/rest/service/AccessServiceTest.java index f27a121..481b0bf 100644 --- a/server/src/test/java/org/apache/kylin/rest/service/AccessServiceTest.java +++ b/server/src/test/java/org/apache/kylin/rest/service/AccessServiceTest.java @@ -26,6 +26,7 @@ import org.apache.kylin.rest.security.AclPermissionFactory; import org.junit.Assert; import org.junit.Test; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.security.acls.domain.PrincipalSid; import org.springframework.security.acls.model.AccessControlEntry; import org.springframework.security.acls.model.Acl; @@ -37,6 +38,7 @@ import org.springframework.security.acls.model.Sid; public class AccessServiceTest extends ServiceTestBase { @Autowired + @Qualifier("accessService") AccessService accessService; @Test http://git-wip-us.apache.org/repos/asf/kylin/blob/73a78dbe/server/src/test/java/org/apache/kylin/rest/service/CubeServiceTest.java ---------------------------------------------------------------------- diff --git a/server/src/test/java/org/apache/kylin/rest/service/CubeServiceTest.java b/server/src/test/java/org/apache/kylin/rest/service/CubeServiceTest.java index 59e96d6..a190d6d 100644 --- a/server/src/test/java/org/apache/kylin/rest/service/CubeServiceTest.java +++ b/server/src/test/java/org/apache/kylin/rest/service/CubeServiceTest.java @@ -29,6 +29,7 @@ import org.junit.Test; import org.springframework.beans.factory.annotation.Autowired; import com.fasterxml.jackson.core.JsonProcessingException; +import org.springframework.beans.factory.annotation.Qualifier; /** * @author xduo @@ -36,9 +37,11 @@ import com.fasterxml.jackson.core.JsonProcessingException; public class CubeServiceTest extends ServiceTestBase { @Autowired + @Qualifier("cubeMgmtService") CubeService cubeService; @Autowired + @Qualifier("cacheService") private CacheService cacheService; @Test http://git-wip-us.apache.org/repos/asf/kylin/blob/73a78dbe/server/src/test/java/org/apache/kylin/rest/service/JobServiceTest.java ---------------------------------------------------------------------- diff --git a/server/src/test/java/org/apache/kylin/rest/service/JobServiceTest.java b/server/src/test/java/org/apache/kylin/rest/service/JobServiceTest.java index 4150808..0932748 100644 --- a/server/src/test/java/org/apache/kylin/rest/service/JobServiceTest.java +++ b/server/src/test/java/org/apache/kylin/rest/service/JobServiceTest.java @@ -26,6 +26,7 @@ import org.apache.kylin.metadata.project.ProjectInstance; import org.junit.Assert; import org.junit.Test; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; /** * @author xduo @@ -33,9 +34,11 @@ import org.springframework.beans.factory.annotation.Autowired; public class JobServiceTest extends ServiceTestBase { @Autowired + @Qualifier("jobService") JobService jobService; @Autowired + @Qualifier("cacheService") private CacheService cacheService; @Test http://git-wip-us.apache.org/repos/asf/kylin/blob/73a78dbe/server/src/test/java/org/apache/kylin/rest/service/ModelServiceTest.java ---------------------------------------------------------------------- diff --git a/server/src/test/java/org/apache/kylin/rest/service/ModelServiceTest.java b/server/src/test/java/org/apache/kylin/rest/service/ModelServiceTest.java index 550aeb8..7e749a9 100644 --- a/server/src/test/java/org/apache/kylin/rest/service/ModelServiceTest.java +++ b/server/src/test/java/org/apache/kylin/rest/service/ModelServiceTest.java @@ -35,10 +35,12 @@ import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; public class ModelServiceTest extends ServiceTestBase { @Autowired + @Qualifier("modelMgmtService") ModelService modelService; @Rule http://git-wip-us.apache.org/repos/asf/kylin/blob/73a78dbe/server/src/test/java/org/apache/kylin/rest/service/QueryServiceTest.java ---------------------------------------------------------------------- diff --git a/server/src/test/java/org/apache/kylin/rest/service/QueryServiceTest.java b/server/src/test/java/org/apache/kylin/rest/service/QueryServiceTest.java index ca4e34c..1e5a7a5 100644 --- a/server/src/test/java/org/apache/kylin/rest/service/QueryServiceTest.java +++ b/server/src/test/java/org/apache/kylin/rest/service/QueryServiceTest.java @@ -28,6 +28,7 @@ import org.apache.kylin.rest.response.SQLResponse; import org.junit.Assert; import org.junit.Test; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; /** * @author xduo @@ -35,9 +36,11 @@ import org.springframework.beans.factory.annotation.Autowired; public class QueryServiceTest extends ServiceTestBase { @Autowired + @Qualifier("queryService") QueryService queryService; @Autowired + @Qualifier("cacheService") private CacheService cacheService; @Test http://git-wip-us.apache.org/repos/asf/kylin/blob/73a78dbe/server/src/test/java/org/apache/kylin/rest/service/ServiceTestBase.java ---------------------------------------------------------------------- diff --git a/server/src/test/java/org/apache/kylin/rest/service/ServiceTestBase.java b/server/src/test/java/org/apache/kylin/rest/service/ServiceTestBase.java index 7370c48..b45b27b 100644 --- a/server/src/test/java/org/apache/kylin/rest/service/ServiceTestBase.java +++ b/server/src/test/java/org/apache/kylin/rest/service/ServiceTestBase.java @@ -31,6 +31,7 @@ import org.junit.BeforeClass; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.security.authentication.TestingAuthenticationToken; import org.springframework.security.core.Authentication; import org.springframework.security.core.context.SecurityContextHolder; @@ -48,6 +49,7 @@ import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; public class ServiceTestBase extends LocalFileMetadataTestCase { @Autowired + @Qualifier("userService") UserService userService; @BeforeClass http://git-wip-us.apache.org/repos/asf/kylin/blob/73a78dbe/server/src/test/java/org/apache/kylin/rest/service/UserServiceTest.java ---------------------------------------------------------------------- diff --git a/server/src/test/java/org/apache/kylin/rest/service/UserServiceTest.java b/server/src/test/java/org/apache/kylin/rest/service/UserServiceTest.java index 3b3a738..0d4b580 100644 --- a/server/src/test/java/org/apache/kylin/rest/service/UserServiceTest.java +++ b/server/src/test/java/org/apache/kylin/rest/service/UserServiceTest.java @@ -18,6 +18,7 @@ package org.apache.kylin.rest.service; +import java.io.IOException; import java.util.ArrayList; import java.util.List; @@ -25,6 +26,7 @@ import org.apache.kylin.rest.constant.Constant; import org.junit.Assert; import org.junit.Test; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.security.core.GrantedAuthority; import org.springframework.security.core.authority.SimpleGrantedAuthority; import org.springframework.security.core.userdetails.User; @@ -36,10 +38,11 @@ import org.springframework.security.core.userdetails.UserDetails; public class UserServiceTest extends ServiceTestBase { @Autowired + @Qualifier("userService") UserService userService; @Test - public void testBasics() { + public void testBasics() throws IOException { userService.deleteUser("ADMIN"); Assert.assertTrue(!userService.userExists("ADMIN")); http://git-wip-us.apache.org/repos/asf/kylin/blob/73a78dbe/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java ---------------------------------------------------------------------- diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java index e388074..bdbe321 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java @@ -61,8 +61,8 @@ import com.google.common.collect.Sets; public class HiveMRInput implements IMRInput { - public static String getTableNameForHCat(TableDesc table, boolean isFullTable) { - String tableName = (table.isView() || isFullTable == false) ? table.getMaterializedName() : table.getName(); + public static String getTableNameForHCat(TableDesc table) { + String tableName = (table.isView()) ? table.getMaterializedName() : table.getName(); return String.format("%s.%s", table.getDatabase(), tableName).toUpperCase(); } @@ -73,12 +73,7 @@ public class HiveMRInput implements IMRInput { @Override public IMRTableInputFormat getTableInputFormat(TableDesc table) { - return new HiveTableInputFormat(getTableNameForHCat(table, true)); - } - - @Override - public IMRTableInputFormat getTableInputFormat(TableDesc table, boolean isFullTable) { - return new HiveTableInputFormat(getTableNameForHCat(table, isFullTable)); + return new HiveTableInputFormat(getTableNameForHCat(table)); } @Override http://git-wip-us.apache.org/repos/asf/kylin/blob/73a78dbe/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityMapper.java ---------------------------------------------------------------------- diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityMapper.java b/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityMapper.java index 9033d67..ffd54db 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityMapper.java +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityMapper.java @@ -63,7 +63,7 @@ public class ColumnCardinalityMapper<T> extends KylinMapper<T, Object, IntWritab String tableName = conf.get(BatchConstants.CFG_TABLE_NAME); tableDesc = MetadataManager.getInstance(config).getTableDesc(tableName); - tableInputFormat = MRUtil.getTableInputFormat(tableDesc, true); + tableInputFormat = MRUtil.getTableInputFormat(tableDesc); } @Override http://git-wip-us.apache.org/repos/asf/kylin/blob/73a78dbe/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/HiveColumnCardinalityJob.java ---------------------------------------------------------------------- diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/HiveColumnCardinalityJob.java b/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/HiveColumnCardinalityJob.java index a837681..f439ccb 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/HiveColumnCardinalityJob.java +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/HiveColumnCardinalityJob.java @@ -87,7 +87,7 @@ public class HiveColumnCardinalityJob extends AbstractHadoopJob { job.getConfiguration().set("mapreduce.output.fileoutputformat.compress", "false"); // Mapper - IMRTableInputFormat tableInputFormat = MRUtil.getTableInputFormat(table, true); + IMRTableInputFormat tableInputFormat = MRUtil.getTableInputFormat(table); tableInputFormat.configureJob(job); job.setMapperClass(ColumnCardinalityMapper.class); http://git-wip-us.apache.org/repos/asf/kylin/blob/73a78dbe/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java index 4c140be..3323afb 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java @@ -76,11 +76,6 @@ public class KafkaMRInput implements IMRInput { @Override public IMRTableInputFormat getTableInputFormat(TableDesc table) { - return getTableInputFormat(table, true); - } - - @Override - public IMRTableInputFormat getTableInputFormat(TableDesc table, boolean isFullTable) { KafkaConfigManager kafkaConfigManager = KafkaConfigManager.getInstance(KylinConfig.getInstanceFromEnv()); KafkaConfig kafkaConfig = kafkaConfigManager.getKafkaConfig(table.getIdentity()); List<TblColRef> columns = Lists.transform(Arrays.asList(table.getColumns()), new Function<ColumnDesc, TblColRef>() {