This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/main by this push:
     new a9e050d73e KYLIN-5181 Support to use postgreSQL to store metadata
a9e050d73e is described below

commit a9e050d73eecbbb6c54e7adadc86e0cac3f6d241
Author: Tengting Xu <34978943+muk...@users.noreply.github.com>
AuthorDate: Wed Nov 2 10:43:03 2022 +0800

    KYLIN-5181 Support to use postgreSQL to store metadata
    
    * KYLIN-5181, support to use postgreSQL to store metadata
    
    * minor fix
    
    * minor fix, remove useless comment
    
    * minor fix, same index name can not create in a schema
    
    * resolve confict
---
 core-common/pom.xml                                |  8 +++--
 .../common/persistence/JDBCResourceStore.java      | 34 +++++++++++++++-------
 .../kylin/common/persistence/ResourceTool.java     | 12 +++++---
 .../resources/metadata-jdbc-postgresql.properties  | 33 +++++++++++++++++++++
 pom.xml                                            |  9 ++++++
 5 files changed, 80 insertions(+), 16 deletions(-)

diff --git a/core-common/pom.xml b/core-common/pom.xml
index 45b22d6276..547e08604c 100644
--- a/core-common/pom.xml
+++ b/core-common/pom.xml
@@ -7,9 +7,9 @@
  to you under the Apache License, Version 2.0 (the
  "License"); you may not use this file except in compliance
  with the License.  You may obtain a copy of the License at
- 
+
      http://www.apache.org/licenses/LICENSE-2.0
- 
+
  Unless required by applicable law or agreed to in writing, software
  distributed under the License is distributed on an "AS IS" BASIS,
  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -101,6 +101,10 @@
             <artifactId>mysql-connector-java</artifactId>
             <scope>provided</scope>
         </dependency>
+        <dependency>
+            <groupId>org.postgresql</groupId>
+            <artifactId>postgresql</artifactId>
+        </dependency>
 
         <dependency>
             <groupId>org.mockito</groupId>
diff --git 
a/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCResourceStore.java
 
b/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCResourceStore.java
index b121af8892..3fcc806066 100644
--- 
a/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCResourceStore.java
+++ 
b/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCResourceStore.java
@@ -50,6 +50,7 @@ public class JDBCResourceStore extends PushdownResourceStore {
     private static final String META_TABLE_KEY = "META_TABLE_KEY";
     private static final String META_TABLE_TS = "META_TABLE_TS";
     private static final String META_TABLE_CONTENT = "META_TABLE_CONTENT";
+    private static final String DIALECT_OF_PG = "postgresql";
     private static Logger logger = 
LoggerFactory.getLogger(JDBCResourceStore.class);
     private JDBCConnectionManager connectionManager;
 
@@ -123,6 +124,9 @@ public class JDBCResourceStore extends 
PushdownResourceStore {
 
                 try {
                     String indexName = "IDX_" + META_TABLE_TS;
+                    if 
(DIALECT_OF_PG.equals(kylinConfig.getMetadataDialect())) {
+                        indexName += System.currentTimeMillis();
+                    }
                     String createIndexSql = sqls.getCreateIndexSql(indexName, 
tableName, META_TABLE_TS);
                     logger.info("Creating index: {}", createIndexSql);
                     pstat = connection.prepareStatement(createIndexSql);
@@ -314,7 +318,13 @@ public class JDBCResourceStore extends 
PushdownResourceStore {
         if (rs == null) {
             return null;
         }
-
+        if (DIALECT_OF_PG.equals(kylinConfig.getMetadataDialect())) {
+            InputStream inputStream = rs.getBinaryStream(META_TABLE_CONTENT);
+            if (inputStream == null) {
+                return openPushdown(resPath);
+            }
+            return inputStream;
+        }
         Blob blob = rs.getBlob(META_TABLE_CONTENT);
 
         if (blob == null || blob.length() == 0) {
@@ -355,13 +365,13 @@ public class JDBCResourceStore extends 
PushdownResourceStore {
                     if (existing) {
                         pstat = 
connection.prepareStatement(sqls.getReplaceSql());
                         pstat.setLong(1, ts);
-                        pstat.setBlob(2, new BufferedInputStream(new 
ByteArrayInputStream(bytes)));
+                        pstat.setBinaryStream(2, new BufferedInputStream(new 
ByteArrayInputStream(bytes)));
                         pstat.setString(3, resPath);
                     } else {
                         pstat = 
connection.prepareStatement(sqls.getInsertSql());
                         pstat.setString(1, resPath);
                         pstat.setLong(2, ts);
-                        pstat.setBlob(3, new BufferedInputStream(new 
ByteArrayInputStream(bytes)));
+                        pstat.setBinaryStream(3, new BufferedInputStream(new 
ByteArrayInputStream(bytes)));
                     }
 
                     if (isContentOverflow(bytes, resPath)) {
@@ -376,8 +386,9 @@ public class JDBCResourceStore extends 
PushdownResourceStore {
                         RollbackablePushdown pushdown = writePushdown(resPath, 
ContentWriter.create(bytes));
                         try {
                             int result = pstat.executeUpdate();
-                            if (result != 1)
+                            if (result != 1) {
                                 throw new SQLException();
+                            }
                         } catch (Exception e) {
                             pushdown.rollback();
                             throw e;
@@ -414,10 +425,11 @@ public class JDBCResourceStore extends 
PushdownResourceStore {
         }
 
         int maxSize = kylinConfig.getJdbcResourceStoreMaxCellSize();
-        if (content.length > maxSize)
+        if (content.length > maxSize) {
             return true;
-        else
+        } else {
             return false;
+        }
     }
 
     @Override
@@ -454,8 +466,9 @@ public class JDBCResourceStore extends 
PushdownResourceStore {
                             RollbackablePushdown pushdown = 
writePushdown(resPath, ContentWriter.create(content));
                             try {
                                 int result = pstat.executeUpdate();
-                                if (result != 1)
+                                if (result != 1) {
                                     throw new SQLException();
+                                }
                             } catch (Throwable e) {
                                 pushdown.rollback();
                                 throw e;
@@ -466,7 +479,7 @@ public class JDBCResourceStore extends 
PushdownResourceStore {
                             pstat = 
connection.prepareStatement(sqls.getInsertSql());
                             pstat.setString(1, resPath);
                             pstat.setLong(2, newTS);
-                            pstat.setBlob(3, new BufferedInputStream(new 
ByteArrayInputStream(content)));
+                            pstat.setBinaryStream(3, new 
BufferedInputStream(new ByteArrayInputStream(content)));
                             pstat.executeUpdate();
                         }
                     } else {
@@ -481,8 +494,9 @@ public class JDBCResourceStore extends 
PushdownResourceStore {
                             RollbackablePushdown pushdown = 
writePushdown(resPath, ContentWriter.create(content));
                             try {
                                 int result = pstat.executeUpdate();
-                                if (result != 1)
+                                if (result != 1) {
                                     throw new SQLException();
+                                }
                             } catch (Throwable e) {
                                 pushdown.rollback();
                                 throw e;
@@ -647,4 +661,4 @@ public class JDBCResourceStore extends 
PushdownResourceStore {
         abstract public void execute(final Connection connection) throws 
SQLException, IOException;
     }
 
-}
\ No newline at end of file
+}
diff --git 
a/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceTool.java
 
b/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceTool.java
index 809df253f1..976e15b5a8 100644
--- 
a/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceTool.java
+++ 
b/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceTool.java
@@ -70,8 +70,9 @@ public class ResourceTool {
             tool.addExcludes(exclude.split("\\s*,\\s*"));
         }
         String group = System.getProperty("group");
-        if (group != null)
+        if (group != null) {
             tool.parallelCopyGroupSize = Integer.parseInt(group);
+        }
 
         tool.addExcludes(IMMUTABLE_PREFIX.toArray(new 
String[IMMUTABLE_PREFIX.size()]));
 
@@ -151,8 +152,9 @@ public class ResourceTool {
 
     private void copyParallel(KylinConfig from, KylinConfig to, String folder) 
throws IOException {
         ResourceParallelCopier copier = new 
ResourceParallelCopier(ResourceStore.getStore(from), 
ResourceStore.getStore(to));
-        if (parallelCopyGroupSize > 0)
+        if (parallelCopyGroupSize > 0) {
             copier.setGroupSize(parallelCopyGroupSize);
+        }
 
         Stats stats = copier.copy(folder, includes, excludes, new Stats() {
 
@@ -178,10 +180,12 @@ public class ResourceTool {
         });
 
         if (stats.hasError()) {
-            for (String errGroup : stats.errorGroups)
+            for (String errGroup : stats.errorGroups) {
                 System.out.println("Failed to copy resource group: " + 
errGroup + "*");
-            for (String errResPath : stats.errorResourcePaths)
+            }
+            for (String errResPath : stats.errorResourcePaths) {
                 System.out.println("Failed to copy resource: " + errResPath);
+            }
             throw new IOException("Failed to copy " + 
stats.errorResource.get() + " resource");
         }
     }
diff --git a/core-common/src/main/resources/metadata-jdbc-postgresql.properties 
b/core-common/src/main/resources/metadata-jdbc-postgresql.properties
new file mode 100644
index 0000000000..a7863f333e
--- /dev/null
+++ b/core-common/src/main/resources/metadata-jdbc-postgresql.properties
@@ -0,0 +1,33 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+###JDBC METASTORE
+format.sql.create-if-need=create table if not exists {0} ( {1} VARCHAR(255) 
primary key, {2} BIGINT, {3} BYTEA )
+format.sql.key-equals=select {0} from {1} where {2} = ?
+format.sql.delete-pstat=delete from {0}  where {1} = ?
+format.sql.list-resource=select {0} from {1} where {2} like ?
+format.sql.all-resource=select {0} from {1} where {2} like ? escape ''#'' and 
{3} >= ? and {4} < ?
+format.sql.replace=update {0} set {1} = ?,{2} = ? where {3} = ?
+format.sql.insert=insert into {0}({1},{2},{3}) values(?,?,?)
+format.sql.replace-without-content=update {0} set {1} = ? where {2} = ?
+format.sql.insert-without-content=insert into {0}({1},{2}) values(?,?)
+format.sql.update-content-ts=update {0} set {1}=?,{2} = ? where {3}=? and {4}=?
+format.sql.test.create=create table if not exists {0} (name VARCHAR(255) 
primary key, id BIGINT)
+format.sql.test.drop=drop table if exists {0}
+format.sql.create-index=create index {0} on {1} ({2})
+format.sql.check-table-exists=SELECT table_name FROM information_schema.tables 
\
+  WHERE table_schema='''public''' AND table_type='''BASE TABLE''' AND 
table_name=''{0}'';
diff --git a/pom.xml b/pom.xml
index ac02fb639f..bb0d8cca22 100644
--- a/pom.xml
+++ b/pom.xml
@@ -89,6 +89,9 @@
     <!-- mysql versions -->
     <mysql-connector.version>5.1.8</mysql-connector.version>
 
+    <!-- postgre SQL versions -->
+    <postgresql.version>42.3.3</postgresql.version>
+
     <!-- Scala versions -->
     <scala.version>2.12.10</scala.version>
     <scala.binary.version>2.12</scala.binary.version>
@@ -593,6 +596,11 @@
         <version>${mysql-connector.version}</version>
         <scope>provided</scope>
       </dependency>
+      <dependency>
+        <groupId>org.postgresql</groupId>
+        <artifactId>postgresql</artifactId>
+        <version>${postgresql.version}</version>
+      </dependency>
       <!-- Hive dependencies -->
       <dependency>
         <groupId>org.apache.hive</groupId>
@@ -1837,6 +1845,7 @@
         </plugins>
       </build>
     </profile>
+
     <profile>
       <id>m2e-only</id>
       <activation>

Reply via email to