This is an automated email from the ASF dual-hosted git repository.

shaofengshi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/master by this push:
     new 367fdfb  KYLIN-3434 Support prepare statement in Kylin server side
367fdfb is described below

commit 367fdfbac44e7a3ce37873ae66457f7b1de68951
Author: Ma,Gang <[email protected]>
AuthorDate: Thu Jun 28 18:11:38 2018 +0800

    KYLIN-3434 Support prepare statement in Kylin server side
---
 .../org/apache/kylin/common/KylinConfigBase.java   |   8 +
 .../java/org/apache/kylin/cube/CubeManager.java    |   1 +
 .../kylin/metadata/project/ProjectManager.java     |  12 +
 pom.xml                                            |   6 +
 server-base/pom.xml                                |   4 +
 .../kylin/rest/request/PrepareSqlRequest.java      |   9 +
 .../apache/kylin/rest/service/QueryService.java    | 292 ++++++++++++++++-----
 7 files changed, 267 insertions(+), 65 deletions(-)

diff --git 
a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java 
b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index c9e2b97..dbf22b5 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -1398,6 +1398,14 @@ abstract public class KylinConfigBase implements 
Serializable {
         return getOptional("kylin.query.access-controller", null);
     }
 
+    public int getQueryMaxCacheStatementNum() {
+        return 
Integer.parseInt(this.getOptional("kylin.query.statement-cache-max-num", 
String.valueOf(50000)));
+    }
+
+    public int getQueryMaxCacheStatementInstancePerKey() {
+        return 
Integer.parseInt(this.getOptional("kylin.query.statement-cache-max-num-per-key",
 String.valueOf(50)));
+    }
+
     public int getDimCountDistinctMaxCardinality() {
         return 
Integer.parseInt(getOptional("kylin.query.max-dimension-count-distinct", 
"5000000"));
     }
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java 
b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
index d6fca79..bde2f8f 100755
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
@@ -264,6 +264,7 @@ public class CubeManager implements IRealizationProvider {
             cube = cube.latestCopyForWrite(); // get a latest copy
             CubeUpdate update = new CubeUpdate(cube);
             update.setStatus(newStatus);
+            ProjectManager.getInstance(config).touchProject(cube.getProject());
             return updateCube(update);
         }
     }
diff --git 
a/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java
 
b/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java
index 25908cd..2d8542e 100644
--- 
a/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java
+++ 
b/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java
@@ -355,6 +355,18 @@ public class ProjectManager {
             save(projectInstance);
         }
     }
+
+    /**
+     * change the last project modify time
+     * @param projectName
+     * @throws IOException
+     */
+    public void touchProject(String projectName) throws IOException {
+        try (AutoLock lock = prjMapLock.lockForWrite()) {
+            ProjectInstance projectInstance = getProject(projectName);
+            save(projectInstance);
+        }
+    }
     
     private ProjectInstance save(ProjectInstance prj) throws IOException {
         crud.save(prj);
diff --git a/pom.xml b/pom.xml
index b5a346b..d9b9efe 100644
--- a/pom.xml
+++ b/pom.xml
@@ -80,6 +80,7 @@
         <commons-upload.version>1.3.3</commons-upload.version>
         <commons-math3.version>3.1.1</commons-math3.version>
         <commons-collections.version>3.2.2</commons-collections.version>
+        <commons-pool.version>2.5.0</commons-pool.version>
 
         <!-- Calcite deps, keep compatible with calcite.version -->
         <jackson.version>2.9.5</jackson.version>
@@ -507,6 +508,11 @@
                 <version>${commons-collections.version}</version>
                 <scope>provided</scope>
             </dependency>
+            <dependency>
+                <groupId>org.apache.commons</groupId>
+                <artifactId>commons-pool2</artifactId>
+                <version>${commons-pool.version}</version>
+            </dependency>
 
             <!-- HBase2 dependencies -->
             <dependency>
diff --git a/server-base/pom.xml b/server-base/pom.xml
index 455ae78..baa6433 100644
--- a/server-base/pom.xml
+++ b/server-base/pom.xml
@@ -85,6 +85,10 @@
             <groupId>net.sf.supercsv</groupId>
             <artifactId>super-csv</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-pool2</artifactId>
+        </dependency>
 
         <dependency>
             <groupId>org.apache.kylin</groupId>
diff --git 
a/server-base/src/main/java/org/apache/kylin/rest/request/PrepareSqlRequest.java
 
b/server-base/src/main/java/org/apache/kylin/rest/request/PrepareSqlRequest.java
index 97a4863..48e382a 100644
--- 
a/server-base/src/main/java/org/apache/kylin/rest/request/PrepareSqlRequest.java
+++ 
b/server-base/src/main/java/org/apache/kylin/rest/request/PrepareSqlRequest.java
@@ -28,6 +28,7 @@ import java.util.Collections;
  * 
  */
 public class PrepareSqlRequest extends SQLRequest {
+    private boolean enableStatementCache = true;
 
     public PrepareSqlRequest() {
         super();
@@ -43,6 +44,14 @@ public class PrepareSqlRequest extends SQLRequest {
         this.params = params;
     }
 
+    public boolean isEnableStatementCache() {
+        return enableStatementCache;
+    }
+
+    public void setEnableStatementCache(boolean enableStatementCache) {
+        this.enableStatementCache = enableStatementCache;
+    }
+
     public static class StateParam implements Serializable {
         private String className;
         private String value;
diff --git 
a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java 
b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
index 4e3fe07..f195e74 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
@@ -42,10 +42,15 @@ import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.NoSuchElementException;
 import java.util.Set;
 
 import javax.annotation.PostConstruct;
 
+import net.sf.ehcache.Cache;
+import net.sf.ehcache.CacheManager;
+import net.sf.ehcache.Element;
+
 import org.apache.calcite.avatica.ColumnMetaData.Rep;
 import org.apache.calcite.config.CalciteConnectionConfig;
 import org.apache.calcite.jdbc.CalcitePrepare;
@@ -55,6 +60,11 @@ import org.apache.calcite.rel.type.RelDataTypeField;
 import org.apache.calcite.sql.type.BasicSqlType;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.commons.pool2.BaseKeyedPooledObjectFactory;
+import org.apache.commons.pool2.PooledObject;
+import org.apache.commons.pool2.impl.DefaultPooledObject;
+import org.apache.commons.pool2.impl.GenericKeyedObjectPool;
+import org.apache.commons.pool2.impl.GenericKeyedObjectPoolConfig;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.QueryContext;
 import org.apache.kylin.common.QueryContextFacade;
@@ -113,10 +123,6 @@ import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 
-import net.sf.ehcache.Cache;
-import net.sf.ehcache.CacheManager;
-import net.sf.ehcache.Element;
-
 /**
  * @author xduo
  */
@@ -144,11 +150,26 @@ public class QueryService extends BasicService {
     @Autowired
     private AclEvaluate aclEvaluate;
 
+    private GenericKeyedObjectPool<PreparedContextKey, PreparedContext> 
preparedContextPool;
+
     public QueryService() {
         queryStore = ResourceStore.getStore(getConfig());
+        preparedContextPool = createPreparedContextPool();
         badQueryDetector.start();
     }
 
+    private GenericKeyedObjectPool<PreparedContextKey, PreparedContext> 
createPreparedContextPool() {
+        PreparedContextFactory factory = new PreparedContextFactory();
+        KylinConfig kylinConfig = getConfig();
+        GenericKeyedObjectPoolConfig config = new 
GenericKeyedObjectPoolConfig();
+        
config.setMaxTotalPerKey(kylinConfig.getQueryMaxCacheStatementInstancePerKey());
+        config.setMaxTotal(kylinConfig.getQueryMaxCacheStatementNum());
+        config.setBlockWhenExhausted(false);
+        config.setMinEvictableIdleTimeMillis(10 * 60 * 1000L); // cached 
statement will be evict if idle for 10 minutes
+        GenericKeyedObjectPool<PreparedContextKey, PreparedContext> pool = new 
GenericKeyedObjectPool<>(factory, config);
+        return pool;
+    }
+
     protected static void close(ResultSet resultSet, Statement stat, 
Connection conn) {
         OLAPContext.clearParameter();
         DBUtils.closeQuietly(resultSet);
@@ -502,10 +523,13 @@ public class QueryService extends BasicService {
 
     private SQLResponse queryWithSqlMassage(SQLRequest sqlRequest) throws 
Exception {
         Connection conn = null;
-
+        boolean isPrepareRequest = isPrepareStatementWithParams(sqlRequest);
+        boolean borrowPrepareContext = false;
+        PreparedContextKey preparedContextKey = null;
+        PreparedContext preparedContext = null;
+        
         try {
             conn = QueryConnection.getConnection(sqlRequest.getProject());
-
             String userInfo = 
SecurityContextHolder.getContext().getAuthentication().getName();
             QueryContext context = QueryContextFacade.current();
             context.setUsername(userInfo);
@@ -539,11 +563,45 @@ public class QueryService extends BasicService {
             OLAPContext.setParameters(parameters);
             // force clear the query context before a new query
             OLAPContext.clearThreadLocalContexts();
-
-            return execute(correctedSql, sqlRequest, conn);
+            
+            // special case for prepare query.
+            List<List<String>> results = Lists.newArrayList();
+            List<SelectedColumnMeta> columnMetas = Lists.newArrayList();
+            if (BackdoorToggles.getPrepareOnly()) {
+                return getPrepareOnlySqlResponse(correctedSql, conn, false, 
results, columnMetas);
+            }
+            if (!isPrepareRequest) {
+                return executeRequest(correctedSql, sqlRequest, conn);
+            } else {
+                long prjLastModifyTime = 
getProjectManager().getProject(sqlRequest.getProject()).getLastModified();
+                preparedContextKey = new 
PreparedContextKey(sqlRequest.getProject(), prjLastModifyTime, correctedSql);
+                PrepareSqlRequest prepareSqlRequest = (PrepareSqlRequest) 
sqlRequest;
+                if (prepareSqlRequest.isEnableStatementCache()) {
+                    try {
+                        preparedContext = 
preparedContextPool.borrowObject(preparedContextKey);
+                        borrowPrepareContext = true;
+                    } catch (NoSuchElementException noElementException) {
+                        borrowPrepareContext = false;
+                        preparedContext = 
createPreparedContext(sqlRequest.getProject(), sqlRequest.getSql());
+                    }
+                    for(OLAPContext olapContext : 
preparedContext.olapContexts) {
+                        OLAPContext.registerContext(olapContext);
+                    }
+                } else {
+                    preparedContext = 
createPreparedContext(sqlRequest.getProject(), sqlRequest.getSql());
+                }
+                return executePrepareRequest(correctedSql, prepareSqlRequest, 
preparedContext);
+            }
 
         } finally {
             DBUtils.closeQuietly(conn);
+            if (preparedContext != null) {
+                if (borrowPrepareContext) {
+                    preparedContextPool.returnObject(preparedContextKey, 
preparedContext);
+                } else {
+                    preparedContext.close();
+                }
+            }
         }
     }
 
@@ -783,83 +841,100 @@ public class QueryService extends BasicService {
      * @return
      * @throws Exception
      */
-    private SQLResponse execute(String correctedSql, SQLRequest sqlRequest, 
Connection conn) throws Exception {
+    private SQLResponse executeRequest(String correctedSql, SQLRequest 
sqlRequest, Connection conn) throws Exception {
         Statement stat = null;
         ResultSet resultSet = null;
         boolean isPushDown = false;
 
-        List<List<String>> results = Lists.newArrayList();
-        List<SelectedColumnMeta> columnMetas = Lists.newArrayList();
-
+        Pair<List<List<String>>, List<SelectedColumnMeta>> r = null;
         try {
+            stat = conn.createStatement();
+            processStatementAttr(stat, sqlRequest);
+            resultSet = stat.executeQuery(correctedSql);
 
-            // special case for prepare query.
-            if (BackdoorToggles.getPrepareOnly()) {
-                return getPrepareOnlySqlResponse(correctedSql, conn, 
isPushDown, results, columnMetas);
-            }
+            r = createResponseFromResultSet(resultSet); 
 
-            if (isPrepareStatementWithParams(sqlRequest)) {
+        } catch (SQLException sqlException) {
+            r = pushDownQuery(sqlRequest, correctedSql, conn, sqlException);
+            if (r == null)
+                throw sqlException;
 
-                stat = conn.prepareStatement(correctedSql); // to be closed in 
the finally
-                PreparedStatement prepared = (PreparedStatement) stat;
-                processStatementAttr(prepared, sqlRequest);
-                for (int i = 0; i < ((PrepareSqlRequest) 
sqlRequest).getParams().length; i++) {
-                    setParam(prepared, i + 1, ((PrepareSqlRequest) 
sqlRequest).getParams()[i]);
-                }
-                resultSet = prepared.executeQuery();
-            } else {
-                stat = conn.createStatement();
-                processStatementAttr(stat, sqlRequest);
-                resultSet = stat.executeQuery(correctedSql);
-            }
+            isPushDown = true;
+        } finally {
+            close(resultSet, stat, null); //conn is passed in, not my duty to 
close
+        }
 
-            ResultSetMetaData metaData = resultSet.getMetaData();
-            int columnCount = metaData.getColumnCount();
-
-            // Fill in selected column meta
-            for (int i = 1; i <= columnCount; ++i) {
-                columnMetas.add(new 
SelectedColumnMeta(metaData.isAutoIncrement(i), metaData.isCaseSensitive(i),
-                        metaData.isSearchable(i), metaData.isCurrency(i), 
metaData.isNullable(i), metaData.isSigned(i),
-                        metaData.getColumnDisplaySize(i), 
metaData.getColumnLabel(i), metaData.getColumnName(i),
-                        metaData.getSchemaName(i), metaData.getCatalogName(i), 
metaData.getTableName(i),
-                        metaData.getPrecision(i), metaData.getScale(i), 
metaData.getColumnType(i),
-                        metaData.getColumnTypeName(i), metaData.isReadOnly(i), 
metaData.isWritable(i),
-                        metaData.isDefinitelyWritable(i)));
-            }
+        return buildSqlResponse(isPushDown, r.getFirst(), r.getSecond());
+    }
 
-            // fill in results
-            while (resultSet.next()) {
-                List<String> oneRow = 
Lists.newArrayListWithCapacity(columnCount);
-                for (int i = 0; i < columnCount; i++) {
-                    oneRow.add((resultSet.getString(i + 1)));
-                }
+    private SQLResponse executePrepareRequest(String correctedSql, 
PrepareSqlRequest sqlRequest, PreparedContext preparedContext)
+            throws Exception {
+        ResultSet resultSet = null;
+        boolean isPushDown = false;
 
-                results.add(oneRow);
+        Pair<List<List<String>>, List<SelectedColumnMeta>> r = null;
+        Connection conn = preparedContext.conn;
+        try {
+            PreparedStatement preparedStatement = 
preparedContext.preparedStatement;
+            processStatementAttr(preparedStatement, sqlRequest);
+            for (int i = 0; i < sqlRequest.getParams().length; i++) {
+                setParam(preparedStatement, i + 1, 
(sqlRequest.getParams()[i]));
             }
-
+            resultSet = preparedStatement.executeQuery();
+            r = createResponseFromResultSet(resultSet);
         } catch (SQLException sqlException) {
-            Pair<List<List<String>>, List<SelectedColumnMeta>> r = null;
-            try {
-                r = 
PushDownUtil.tryPushDownSelectQuery(sqlRequest.getProject(), correctedSql, 
conn.getSchema(),
-                        sqlException, BackdoorToggles.getPrepareOnly());
-            } catch (Exception e2) {
-                logger.error("pushdown engine failed current query too", e2);
-                //exception in pushdown, throw it instead of exception in 
calcite
-                throw e2;
-            }
-
+            r = pushDownQuery(sqlRequest, correctedSql, conn, sqlException);
             if (r == null)
                 throw sqlException;
 
             isPushDown = true;
-            results = r.getFirst();
-            columnMetas = r.getSecond();
-
         } finally {
-            close(resultSet, stat, null); //conn is passed in, not my duty to 
close
+            DBUtils.closeQuietly(resultSet);
         }
 
-        return buildSqlResponse(isPushDown, results, columnMetas);
+        return buildSqlResponse(isPushDown, r.getFirst(), r.getSecond());
+    }
+
+    private Pair<List<List<String>>, List<SelectedColumnMeta>> 
pushDownQuery(SQLRequest sqlRequest, String correctedSql, Connection conn, 
SQLException sqlException) throws Exception{
+        try {
+            return 
PushDownUtil.tryPushDownSelectQuery(sqlRequest.getProject(), correctedSql, 
conn.getSchema(),
+                    sqlException, BackdoorToggles.getPrepareOnly());
+        } catch (Exception e2) {
+            logger.error("pushdown engine failed current query too", e2);
+            //exception in pushdown, throw it instead of exception in calcite
+            throw e2;
+        }
+    }
+
+    private Pair<List<List<String>>, List<SelectedColumnMeta>> 
createResponseFromResultSet(ResultSet resultSet)
+            throws Exception {
+        List<List<String>> results = Lists.newArrayList();
+        List<SelectedColumnMeta> columnMetas = Lists.newArrayList();
+
+        ResultSetMetaData metaData = resultSet.getMetaData();
+        int columnCount = metaData.getColumnCount();
+
+        // Fill in selected column meta
+        for (int i = 1; i <= columnCount; ++i) {
+            columnMetas.add(new 
SelectedColumnMeta(metaData.isAutoIncrement(i), metaData.isCaseSensitive(i), 
metaData
+                    .isSearchable(i), metaData.isCurrency(i), 
metaData.isNullable(i), metaData.isSigned(i), metaData
+                    .getColumnDisplaySize(i), metaData.getColumnLabel(i), 
metaData.getColumnName(i), metaData
+                    .getSchemaName(i), metaData.getCatalogName(i), 
metaData.getTableName(i), metaData.getPrecision(i),
+                    metaData.getScale(i), metaData.getColumnType(i), 
metaData.getColumnTypeName(i), metaData
+                            .isReadOnly(i), metaData.isWritable(i), 
metaData.isDefinitelyWritable(i)));
+        }
+
+        // fill in results
+        while (resultSet.next()) {
+            List<String> oneRow = Lists.newArrayListWithCapacity(columnCount);
+            for (int i = 0; i < columnCount; i++) {
+                oneRow.add((resultSet.getString(i + 1)));
+            }
+
+            results.add(oneRow);
+        }
+
+        return new Pair<>(results, columnMetas);
     }
 
     protected String makeErrorMsgUserFriendly(Throwable e) {
@@ -1044,6 +1119,13 @@ public class QueryService extends BasicService {
         this.cacheManager = cacheManager;
     }
 
+    private static PreparedContext createPreparedContext(String project, 
String sql) throws Exception{
+        Connection conn = QueryConnection.getConnection(project);
+        PreparedStatement preparedStatement = conn.prepareStatement(sql);
+        Collection<OLAPContext> olapContexts = 
OLAPContext.getThreadLocalContexts();
+        return new PreparedContext(conn, preparedStatement, olapContexts);
+    }
+
     private static class QueryRecordSerializer implements 
Serializer<QueryRecord> {
 
         private static final QueryRecordSerializer serializer = new 
QueryRecordSerializer();
@@ -1069,6 +1151,86 @@ public class QueryService extends BasicService {
         }
     }
 
+    private static class PreparedContextFactory extends
+            BaseKeyedPooledObjectFactory<PreparedContextKey, PreparedContext> {
+
+        @Override
+        public PreparedContext create(PreparedContextKey key) throws Exception 
{
+            return createPreparedContext(key.project, key.sql);
+        }
+
+        @Override
+        public PooledObject<PreparedContext> wrap(PreparedContext value) {
+            return new DefaultPooledObject<>(value);
+        }
+
+        @Override
+        public void destroyObject(final PreparedContextKey key, final 
PooledObject<PreparedContext> p) {
+            PreparedContext cachedContext = p.getObject();
+            cachedContext.close();
+        }
+
+        @Override
+        public boolean validateObject(final PreparedContextKey key, final 
PooledObject<PreparedContext> p) {
+            return true;
+        }
+    }
+
+    private static class PreparedContextKey {
+        private String project;
+        private long prjLastModifyTime;
+        private String sql;
+
+        public PreparedContextKey(String project, long prjLastModifyTime, 
String sql) {
+            this.project = project;
+            this.prjLastModifyTime = prjLastModifyTime;
+            this.sql = sql;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
+
+            PreparedContextKey that = (PreparedContextKey) o;
+
+            if (prjLastModifyTime != that.prjLastModifyTime) return false;
+            if (project != null ? !project.equals(that.project) : that.project 
!= null) return false;
+            return sql != null ? sql.equals(that.sql) : that.sql == null;
+
+        }
+
+        @Override
+        public int hashCode() {
+            int result = project != null ? project.hashCode() : 0;
+            result = 31 * result + (int) (prjLastModifyTime ^ 
(prjLastModifyTime >>> 32));
+            result = 31 * result + (sql != null ? sql.hashCode() : 0);
+            return result;
+        }
+    }
+
+    private static class PreparedContext {
+        private Connection conn;
+        private PreparedStatement preparedStatement;
+        private Collection<OLAPContext> olapContexts;
+
+        public PreparedContext(Connection conn, PreparedStatement 
preparedStatement,
+                               Collection<OLAPContext> olapContexts) {
+            this.conn = conn;
+            this.preparedStatement = preparedStatement;
+            this.olapContexts = olapContexts;
+        }
+
+        public void close() {
+            if (conn != null) {
+                DBUtils.closeQuietly(conn);
+            }
+            if (preparedStatement != null) {
+                DBUtils.closeQuietly(preparedStatement);
+            }
+        }
+    }
+
 }
 
 @SuppressWarnings("serial")

Reply via email to