This is an automated email from the ASF dual-hosted git repository.
shaofengshi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/master by this push:
new 367fdfb KYLIN-3434 Support prepare statement in Kylin server side
367fdfb is described below
commit 367fdfbac44e7a3ce37873ae66457f7b1de68951
Author: Ma,Gang <[email protected]>
AuthorDate: Thu Jun 28 18:11:38 2018 +0800
KYLIN-3434 Support prepare statement in Kylin server side
---
.../org/apache/kylin/common/KylinConfigBase.java | 8 +
.../java/org/apache/kylin/cube/CubeManager.java | 1 +
.../kylin/metadata/project/ProjectManager.java | 12 +
pom.xml | 6 +
server-base/pom.xml | 4 +
.../kylin/rest/request/PrepareSqlRequest.java | 9 +
.../apache/kylin/rest/service/QueryService.java | 292 ++++++++++++++++-----
7 files changed, 267 insertions(+), 65 deletions(-)
diff --git
a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index c9e2b97..dbf22b5 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -1398,6 +1398,14 @@ abstract public class KylinConfigBase implements
Serializable {
return getOptional("kylin.query.access-controller", null);
}
+ public int getQueryMaxCacheStatementNum() {
+ return
Integer.parseInt(this.getOptional("kylin.query.statement-cache-max-num",
String.valueOf(50000)));
+ }
+
+ public int getQueryMaxCacheStatementInstancePerKey() {
+ return
Integer.parseInt(this.getOptional("kylin.query.statement-cache-max-num-per-key",
String.valueOf(50)));
+ }
+
public int getDimCountDistinctMaxCardinality() {
return
Integer.parseInt(getOptional("kylin.query.max-dimension-count-distinct",
"5000000"));
}
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
index d6fca79..bde2f8f 100755
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
@@ -264,6 +264,7 @@ public class CubeManager implements IRealizationProvider {
cube = cube.latestCopyForWrite(); // get a latest copy
CubeUpdate update = new CubeUpdate(cube);
update.setStatus(newStatus);
+ ProjectManager.getInstance(config).touchProject(cube.getProject());
return updateCube(update);
}
}
diff --git
a/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java
b/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java
index 25908cd..2d8542e 100644
---
a/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java
+++
b/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java
@@ -355,6 +355,18 @@ public class ProjectManager {
save(projectInstance);
}
}
+
+ /**
+ * change the last project modify time
+ * @param projectName
+ * @throws IOException
+ */
+ public void touchProject(String projectName) throws IOException {
+ try (AutoLock lock = prjMapLock.lockForWrite()) {
+ ProjectInstance projectInstance = getProject(projectName);
+ save(projectInstance);
+ }
+ }
private ProjectInstance save(ProjectInstance prj) throws IOException {
crud.save(prj);
diff --git a/pom.xml b/pom.xml
index b5a346b..d9b9efe 100644
--- a/pom.xml
+++ b/pom.xml
@@ -80,6 +80,7 @@
<commons-upload.version>1.3.3</commons-upload.version>
<commons-math3.version>3.1.1</commons-math3.version>
<commons-collections.version>3.2.2</commons-collections.version>
+ <commons-pool.version>2.5.0</commons-pool.version>
<!-- Calcite deps, keep compatible with calcite.version -->
<jackson.version>2.9.5</jackson.version>
@@ -507,6 +508,11 @@
<version>${commons-collections.version}</version>
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-pool2</artifactId>
+ <version>${commons-pool.version}</version>
+ </dependency>
<!-- HBase2 dependencies -->
<dependency>
diff --git a/server-base/pom.xml b/server-base/pom.xml
index 455ae78..baa6433 100644
--- a/server-base/pom.xml
+++ b/server-base/pom.xml
@@ -85,6 +85,10 @@
<groupId>net.sf.supercsv</groupId>
<artifactId>super-csv</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-pool2</artifactId>
+ </dependency>
<dependency>
<groupId>org.apache.kylin</groupId>
diff --git
a/server-base/src/main/java/org/apache/kylin/rest/request/PrepareSqlRequest.java
b/server-base/src/main/java/org/apache/kylin/rest/request/PrepareSqlRequest.java
index 97a4863..48e382a 100644
---
a/server-base/src/main/java/org/apache/kylin/rest/request/PrepareSqlRequest.java
+++
b/server-base/src/main/java/org/apache/kylin/rest/request/PrepareSqlRequest.java
@@ -28,6 +28,7 @@ import java.util.Collections;
*
*/
public class PrepareSqlRequest extends SQLRequest {
+ private boolean enableStatementCache = true;
public PrepareSqlRequest() {
super();
@@ -43,6 +44,14 @@ public class PrepareSqlRequest extends SQLRequest {
this.params = params;
}
+ public boolean isEnableStatementCache() {
+ return enableStatementCache;
+ }
+
+ public void setEnableStatementCache(boolean enableStatementCache) {
+ this.enableStatementCache = enableStatementCache;
+ }
+
public static class StateParam implements Serializable {
private String className;
private String value;
diff --git
a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
index 4e3fe07..f195e74 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
@@ -42,10 +42,15 @@ import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.NoSuchElementException;
import java.util.Set;
import javax.annotation.PostConstruct;
+import net.sf.ehcache.Cache;
+import net.sf.ehcache.CacheManager;
+import net.sf.ehcache.Element;
+
import org.apache.calcite.avatica.ColumnMetaData.Rep;
import org.apache.calcite.config.CalciteConnectionConfig;
import org.apache.calcite.jdbc.CalcitePrepare;
@@ -55,6 +60,11 @@ import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.calcite.sql.type.BasicSqlType;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.commons.pool2.BaseKeyedPooledObjectFactory;
+import org.apache.commons.pool2.PooledObject;
+import org.apache.commons.pool2.impl.DefaultPooledObject;
+import org.apache.commons.pool2.impl.GenericKeyedObjectPool;
+import org.apache.commons.pool2.impl.GenericKeyedObjectPoolConfig;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.QueryContext;
import org.apache.kylin.common.QueryContextFacade;
@@ -113,10 +123,6 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
-import net.sf.ehcache.Cache;
-import net.sf.ehcache.CacheManager;
-import net.sf.ehcache.Element;
-
/**
* @author xduo
*/
@@ -144,11 +150,26 @@ public class QueryService extends BasicService {
@Autowired
private AclEvaluate aclEvaluate;
+ private GenericKeyedObjectPool<PreparedContextKey, PreparedContext>
preparedContextPool;
+
public QueryService() {
queryStore = ResourceStore.getStore(getConfig());
+ preparedContextPool = createPreparedContextPool();
badQueryDetector.start();
}
+ private GenericKeyedObjectPool<PreparedContextKey, PreparedContext>
createPreparedContextPool() {
+ PreparedContextFactory factory = new PreparedContextFactory();
+ KylinConfig kylinConfig = getConfig();
+ GenericKeyedObjectPoolConfig config = new
GenericKeyedObjectPoolConfig();
+
config.setMaxTotalPerKey(kylinConfig.getQueryMaxCacheStatementInstancePerKey());
+ config.setMaxTotal(kylinConfig.getQueryMaxCacheStatementNum());
+ config.setBlockWhenExhausted(false);
+ config.setMinEvictableIdleTimeMillis(10 * 60 * 1000L); // cached
statement will be evict if idle for 10 minutes
+ GenericKeyedObjectPool<PreparedContextKey, PreparedContext> pool = new
GenericKeyedObjectPool<>(factory, config);
+ return pool;
+ }
+
protected static void close(ResultSet resultSet, Statement stat,
Connection conn) {
OLAPContext.clearParameter();
DBUtils.closeQuietly(resultSet);
@@ -502,10 +523,13 @@ public class QueryService extends BasicService {
private SQLResponse queryWithSqlMassage(SQLRequest sqlRequest) throws
Exception {
Connection conn = null;
-
+ boolean isPrepareRequest = isPrepareStatementWithParams(sqlRequest);
+ boolean borrowPrepareContext = false;
+ PreparedContextKey preparedContextKey = null;
+ PreparedContext preparedContext = null;
+
try {
conn = QueryConnection.getConnection(sqlRequest.getProject());
-
String userInfo =
SecurityContextHolder.getContext().getAuthentication().getName();
QueryContext context = QueryContextFacade.current();
context.setUsername(userInfo);
@@ -539,11 +563,45 @@ public class QueryService extends BasicService {
OLAPContext.setParameters(parameters);
// force clear the query context before a new query
OLAPContext.clearThreadLocalContexts();
-
- return execute(correctedSql, sqlRequest, conn);
+
+ // special case for prepare query.
+ List<List<String>> results = Lists.newArrayList();
+ List<SelectedColumnMeta> columnMetas = Lists.newArrayList();
+ if (BackdoorToggles.getPrepareOnly()) {
+ return getPrepareOnlySqlResponse(correctedSql, conn, false,
results, columnMetas);
+ }
+ if (!isPrepareRequest) {
+ return executeRequest(correctedSql, sqlRequest, conn);
+ } else {
+ long prjLastModifyTime =
getProjectManager().getProject(sqlRequest.getProject()).getLastModified();
+ preparedContextKey = new
PreparedContextKey(sqlRequest.getProject(), prjLastModifyTime, correctedSql);
+ PrepareSqlRequest prepareSqlRequest = (PrepareSqlRequest)
sqlRequest;
+ if (prepareSqlRequest.isEnableStatementCache()) {
+ try {
+ preparedContext =
preparedContextPool.borrowObject(preparedContextKey);
+ borrowPrepareContext = true;
+ } catch (NoSuchElementException noElementException) {
+ borrowPrepareContext = false;
+ preparedContext =
createPreparedContext(sqlRequest.getProject(), sqlRequest.getSql());
+ }
+ for(OLAPContext olapContext :
preparedContext.olapContexts) {
+ OLAPContext.registerContext(olapContext);
+ }
+ } else {
+ preparedContext =
createPreparedContext(sqlRequest.getProject(), sqlRequest.getSql());
+ }
+ return executePrepareRequest(correctedSql, prepareSqlRequest,
preparedContext);
+ }
} finally {
DBUtils.closeQuietly(conn);
+ if (preparedContext != null) {
+ if (borrowPrepareContext) {
+ preparedContextPool.returnObject(preparedContextKey,
preparedContext);
+ } else {
+ preparedContext.close();
+ }
+ }
}
}
@@ -783,83 +841,100 @@ public class QueryService extends BasicService {
* @return
* @throws Exception
*/
- private SQLResponse execute(String correctedSql, SQLRequest sqlRequest,
Connection conn) throws Exception {
+ private SQLResponse executeRequest(String correctedSql, SQLRequest
sqlRequest, Connection conn) throws Exception {
Statement stat = null;
ResultSet resultSet = null;
boolean isPushDown = false;
- List<List<String>> results = Lists.newArrayList();
- List<SelectedColumnMeta> columnMetas = Lists.newArrayList();
-
+ Pair<List<List<String>>, List<SelectedColumnMeta>> r = null;
try {
+ stat = conn.createStatement();
+ processStatementAttr(stat, sqlRequest);
+ resultSet = stat.executeQuery(correctedSql);
- // special case for prepare query.
- if (BackdoorToggles.getPrepareOnly()) {
- return getPrepareOnlySqlResponse(correctedSql, conn,
isPushDown, results, columnMetas);
- }
+ r = createResponseFromResultSet(resultSet);
- if (isPrepareStatementWithParams(sqlRequest)) {
+ } catch (SQLException sqlException) {
+ r = pushDownQuery(sqlRequest, correctedSql, conn, sqlException);
+ if (r == null)
+ throw sqlException;
- stat = conn.prepareStatement(correctedSql); // to be closed in
the finally
- PreparedStatement prepared = (PreparedStatement) stat;
- processStatementAttr(prepared, sqlRequest);
- for (int i = 0; i < ((PrepareSqlRequest)
sqlRequest).getParams().length; i++) {
- setParam(prepared, i + 1, ((PrepareSqlRequest)
sqlRequest).getParams()[i]);
- }
- resultSet = prepared.executeQuery();
- } else {
- stat = conn.createStatement();
- processStatementAttr(stat, sqlRequest);
- resultSet = stat.executeQuery(correctedSql);
- }
+ isPushDown = true;
+ } finally {
+ close(resultSet, stat, null); //conn is passed in, not my duty to
close
+ }
- ResultSetMetaData metaData = resultSet.getMetaData();
- int columnCount = metaData.getColumnCount();
-
- // Fill in selected column meta
- for (int i = 1; i <= columnCount; ++i) {
- columnMetas.add(new
SelectedColumnMeta(metaData.isAutoIncrement(i), metaData.isCaseSensitive(i),
- metaData.isSearchable(i), metaData.isCurrency(i),
metaData.isNullable(i), metaData.isSigned(i),
- metaData.getColumnDisplaySize(i),
metaData.getColumnLabel(i), metaData.getColumnName(i),
- metaData.getSchemaName(i), metaData.getCatalogName(i),
metaData.getTableName(i),
- metaData.getPrecision(i), metaData.getScale(i),
metaData.getColumnType(i),
- metaData.getColumnTypeName(i), metaData.isReadOnly(i),
metaData.isWritable(i),
- metaData.isDefinitelyWritable(i)));
- }
+ return buildSqlResponse(isPushDown, r.getFirst(), r.getSecond());
+ }
- // fill in results
- while (resultSet.next()) {
- List<String> oneRow =
Lists.newArrayListWithCapacity(columnCount);
- for (int i = 0; i < columnCount; i++) {
- oneRow.add((resultSet.getString(i + 1)));
- }
+ private SQLResponse executePrepareRequest(String correctedSql,
PrepareSqlRequest sqlRequest, PreparedContext preparedContext)
+ throws Exception {
+ ResultSet resultSet = null;
+ boolean isPushDown = false;
- results.add(oneRow);
+ Pair<List<List<String>>, List<SelectedColumnMeta>> r = null;
+ Connection conn = preparedContext.conn;
+ try {
+ PreparedStatement preparedStatement =
preparedContext.preparedStatement;
+ processStatementAttr(preparedStatement, sqlRequest);
+ for (int i = 0; i < sqlRequest.getParams().length; i++) {
+ setParam(preparedStatement, i + 1,
(sqlRequest.getParams()[i]));
}
-
+ resultSet = preparedStatement.executeQuery();
+ r = createResponseFromResultSet(resultSet);
} catch (SQLException sqlException) {
- Pair<List<List<String>>, List<SelectedColumnMeta>> r = null;
- try {
- r =
PushDownUtil.tryPushDownSelectQuery(sqlRequest.getProject(), correctedSql,
conn.getSchema(),
- sqlException, BackdoorToggles.getPrepareOnly());
- } catch (Exception e2) {
- logger.error("pushdown engine failed current query too", e2);
- //exception in pushdown, throw it instead of exception in
calcite
- throw e2;
- }
-
+ r = pushDownQuery(sqlRequest, correctedSql, conn, sqlException);
if (r == null)
throw sqlException;
isPushDown = true;
- results = r.getFirst();
- columnMetas = r.getSecond();
-
} finally {
- close(resultSet, stat, null); //conn is passed in, not my duty to
close
+ DBUtils.closeQuietly(resultSet);
}
- return buildSqlResponse(isPushDown, results, columnMetas);
+ return buildSqlResponse(isPushDown, r.getFirst(), r.getSecond());
+ }
+
+ private Pair<List<List<String>>, List<SelectedColumnMeta>>
pushDownQuery(SQLRequest sqlRequest, String correctedSql, Connection conn,
SQLException sqlException) throws Exception{
+ try {
+ return
PushDownUtil.tryPushDownSelectQuery(sqlRequest.getProject(), correctedSql,
conn.getSchema(),
+ sqlException, BackdoorToggles.getPrepareOnly());
+ } catch (Exception e2) {
+ logger.error("pushdown engine failed current query too", e2);
+ //exception in pushdown, throw it instead of exception in calcite
+ throw e2;
+ }
+ }
+
+ private Pair<List<List<String>>, List<SelectedColumnMeta>>
createResponseFromResultSet(ResultSet resultSet)
+ throws Exception {
+ List<List<String>> results = Lists.newArrayList();
+ List<SelectedColumnMeta> columnMetas = Lists.newArrayList();
+
+ ResultSetMetaData metaData = resultSet.getMetaData();
+ int columnCount = metaData.getColumnCount();
+
+ // Fill in selected column meta
+ for (int i = 1; i <= columnCount; ++i) {
+ columnMetas.add(new
SelectedColumnMeta(metaData.isAutoIncrement(i), metaData.isCaseSensitive(i),
metaData
+ .isSearchable(i), metaData.isCurrency(i),
metaData.isNullable(i), metaData.isSigned(i), metaData
+ .getColumnDisplaySize(i), metaData.getColumnLabel(i),
metaData.getColumnName(i), metaData
+ .getSchemaName(i), metaData.getCatalogName(i),
metaData.getTableName(i), metaData.getPrecision(i),
+ metaData.getScale(i), metaData.getColumnType(i),
metaData.getColumnTypeName(i), metaData
+ .isReadOnly(i), metaData.isWritable(i),
metaData.isDefinitelyWritable(i)));
+ }
+
+ // fill in results
+ while (resultSet.next()) {
+ List<String> oneRow = Lists.newArrayListWithCapacity(columnCount);
+ for (int i = 0; i < columnCount; i++) {
+ oneRow.add((resultSet.getString(i + 1)));
+ }
+
+ results.add(oneRow);
+ }
+
+ return new Pair<>(results, columnMetas);
}
protected String makeErrorMsgUserFriendly(Throwable e) {
@@ -1044,6 +1119,13 @@ public class QueryService extends BasicService {
this.cacheManager = cacheManager;
}
+ private static PreparedContext createPreparedContext(String project,
String sql) throws Exception{
+ Connection conn = QueryConnection.getConnection(project);
+ PreparedStatement preparedStatement = conn.prepareStatement(sql);
+ Collection<OLAPContext> olapContexts =
OLAPContext.getThreadLocalContexts();
+ return new PreparedContext(conn, preparedStatement, olapContexts);
+ }
+
private static class QueryRecordSerializer implements
Serializer<QueryRecord> {
private static final QueryRecordSerializer serializer = new
QueryRecordSerializer();
@@ -1069,6 +1151,86 @@ public class QueryService extends BasicService {
}
}
+ private static class PreparedContextFactory extends
+ BaseKeyedPooledObjectFactory<PreparedContextKey, PreparedContext> {
+
+ @Override
+ public PreparedContext create(PreparedContextKey key) throws Exception
{
+ return createPreparedContext(key.project, key.sql);
+ }
+
+ @Override
+ public PooledObject<PreparedContext> wrap(PreparedContext value) {
+ return new DefaultPooledObject<>(value);
+ }
+
+ @Override
+ public void destroyObject(final PreparedContextKey key, final
PooledObject<PreparedContext> p) {
+ PreparedContext cachedContext = p.getObject();
+ cachedContext.close();
+ }
+
+ @Override
+ public boolean validateObject(final PreparedContextKey key, final
PooledObject<PreparedContext> p) {
+ return true;
+ }
+ }
+
+ private static class PreparedContextKey {
+ private String project;
+ private long prjLastModifyTime;
+ private String sql;
+
+ public PreparedContextKey(String project, long prjLastModifyTime,
String sql) {
+ this.project = project;
+ this.prjLastModifyTime = prjLastModifyTime;
+ this.sql = sql;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ PreparedContextKey that = (PreparedContextKey) o;
+
+ if (prjLastModifyTime != that.prjLastModifyTime) return false;
+ if (project != null ? !project.equals(that.project) : that.project
!= null) return false;
+ return sql != null ? sql.equals(that.sql) : that.sql == null;
+
+ }
+
+ @Override
+ public int hashCode() {
+ int result = project != null ? project.hashCode() : 0;
+ result = 31 * result + (int) (prjLastModifyTime ^
(prjLastModifyTime >>> 32));
+ result = 31 * result + (sql != null ? sql.hashCode() : 0);
+ return result;
+ }
+ }
+
+ private static class PreparedContext {
+ private Connection conn;
+ private PreparedStatement preparedStatement;
+ private Collection<OLAPContext> olapContexts;
+
+ public PreparedContext(Connection conn, PreparedStatement
preparedStatement,
+ Collection<OLAPContext> olapContexts) {
+ this.conn = conn;
+ this.preparedStatement = preparedStatement;
+ this.olapContexts = olapContexts;
+ }
+
+ public void close() {
+ if (conn != null) {
+ DBUtils.closeQuietly(conn);
+ }
+ if (preparedStatement != null) {
+ DBUtils.closeQuietly(preparedStatement);
+ }
+ }
+ }
+
}
@SuppressWarnings("serial")