This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/main by this push: new a9e050d73e KYLIN-5181 Support to use postgreSQL to store metadata a9e050d73e is described below commit a9e050d73eecbbb6c54e7adadc86e0cac3f6d241 Author: Tengting Xu <34978943+muk...@users.noreply.github.com> AuthorDate: Wed Nov 2 10:43:03 2022 +0800 KYLIN-5181 Support to use postgreSQL to store metadata * KYLIN-5181, support to use postgreSQL to store metadata * minor fix * minor fix, remove useless comment * minor fix, same index name can not create in a schema * resolve confict --- core-common/pom.xml | 8 +++-- .../common/persistence/JDBCResourceStore.java | 34 +++++++++++++++------- .../kylin/common/persistence/ResourceTool.java | 12 +++++--- .../resources/metadata-jdbc-postgresql.properties | 33 +++++++++++++++++++++ pom.xml | 9 ++++++ 5 files changed, 80 insertions(+), 16 deletions(-) diff --git a/core-common/pom.xml b/core-common/pom.xml index 45b22d6276..547e08604c 100644 --- a/core-common/pom.xml +++ b/core-common/pom.xml @@ -7,9 +7,9 @@ to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - + http://www.apache.org/licenses/LICENSE-2.0 - + Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -101,6 +101,10 @@ <artifactId>mysql-connector-java</artifactId> <scope>provided</scope> </dependency> + <dependency> + <groupId>org.postgresql</groupId> + <artifactId>postgresql</artifactId> + </dependency> <dependency> <groupId>org.mockito</groupId> diff --git a/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCResourceStore.java b/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCResourceStore.java index b121af8892..3fcc806066 100644 --- a/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCResourceStore.java +++ b/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCResourceStore.java @@ -50,6 +50,7 @@ public class JDBCResourceStore extends PushdownResourceStore { private static final String META_TABLE_KEY = "META_TABLE_KEY"; private static final String META_TABLE_TS = "META_TABLE_TS"; private static final String META_TABLE_CONTENT = "META_TABLE_CONTENT"; + private static final String DIALECT_OF_PG = "postgresql"; private static Logger logger = LoggerFactory.getLogger(JDBCResourceStore.class); private JDBCConnectionManager connectionManager; @@ -123,6 +124,9 @@ public class JDBCResourceStore extends PushdownResourceStore { try { String indexName = "IDX_" + META_TABLE_TS; + if (DIALECT_OF_PG.equals(kylinConfig.getMetadataDialect())) { + indexName += System.currentTimeMillis(); + } String createIndexSql = sqls.getCreateIndexSql(indexName, tableName, META_TABLE_TS); logger.info("Creating index: {}", createIndexSql); pstat = connection.prepareStatement(createIndexSql); @@ -314,7 +318,13 @@ public class JDBCResourceStore extends PushdownResourceStore { if (rs == null) { return null; } - + if (DIALECT_OF_PG.equals(kylinConfig.getMetadataDialect())) { + InputStream inputStream = rs.getBinaryStream(META_TABLE_CONTENT); + if (inputStream == null) { + return openPushdown(resPath); + } + return inputStream; + } Blob blob = rs.getBlob(META_TABLE_CONTENT); if (blob == null || blob.length() == 0) { @@ -355,13 +365,13 @@ public class JDBCResourceStore extends PushdownResourceStore { if (existing) { pstat = connection.prepareStatement(sqls.getReplaceSql()); pstat.setLong(1, ts); - pstat.setBlob(2, new BufferedInputStream(new ByteArrayInputStream(bytes))); + pstat.setBinaryStream(2, new BufferedInputStream(new ByteArrayInputStream(bytes))); pstat.setString(3, resPath); } else { pstat = connection.prepareStatement(sqls.getInsertSql()); pstat.setString(1, resPath); pstat.setLong(2, ts); - pstat.setBlob(3, new BufferedInputStream(new ByteArrayInputStream(bytes))); + pstat.setBinaryStream(3, new BufferedInputStream(new ByteArrayInputStream(bytes))); } if (isContentOverflow(bytes, resPath)) { @@ -376,8 +386,9 @@ public class JDBCResourceStore extends PushdownResourceStore { RollbackablePushdown pushdown = writePushdown(resPath, ContentWriter.create(bytes)); try { int result = pstat.executeUpdate(); - if (result != 1) + if (result != 1) { throw new SQLException(); + } } catch (Exception e) { pushdown.rollback(); throw e; @@ -414,10 +425,11 @@ public class JDBCResourceStore extends PushdownResourceStore { } int maxSize = kylinConfig.getJdbcResourceStoreMaxCellSize(); - if (content.length > maxSize) + if (content.length > maxSize) { return true; - else + } else { return false; + } } @Override @@ -454,8 +466,9 @@ public class JDBCResourceStore extends PushdownResourceStore { RollbackablePushdown pushdown = writePushdown(resPath, ContentWriter.create(content)); try { int result = pstat.executeUpdate(); - if (result != 1) + if (result != 1) { throw new SQLException(); + } } catch (Throwable e) { pushdown.rollback(); throw e; @@ -466,7 +479,7 @@ public class JDBCResourceStore extends PushdownResourceStore { pstat = connection.prepareStatement(sqls.getInsertSql()); pstat.setString(1, resPath); pstat.setLong(2, newTS); - pstat.setBlob(3, new BufferedInputStream(new ByteArrayInputStream(content))); + pstat.setBinaryStream(3, new BufferedInputStream(new ByteArrayInputStream(content))); pstat.executeUpdate(); } } else { @@ -481,8 +494,9 @@ public class JDBCResourceStore extends PushdownResourceStore { RollbackablePushdown pushdown = writePushdown(resPath, ContentWriter.create(content)); try { int result = pstat.executeUpdate(); - if (result != 1) + if (result != 1) { throw new SQLException(); + } } catch (Throwable e) { pushdown.rollback(); throw e; @@ -647,4 +661,4 @@ public class JDBCResourceStore extends PushdownResourceStore { abstract public void execute(final Connection connection) throws SQLException, IOException; } -} \ No newline at end of file +} diff --git a/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceTool.java b/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceTool.java index 809df253f1..976e15b5a8 100644 --- a/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceTool.java +++ b/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceTool.java @@ -70,8 +70,9 @@ public class ResourceTool { tool.addExcludes(exclude.split("\\s*,\\s*")); } String group = System.getProperty("group"); - if (group != null) + if (group != null) { tool.parallelCopyGroupSize = Integer.parseInt(group); + } tool.addExcludes(IMMUTABLE_PREFIX.toArray(new String[IMMUTABLE_PREFIX.size()])); @@ -151,8 +152,9 @@ public class ResourceTool { private void copyParallel(KylinConfig from, KylinConfig to, String folder) throws IOException { ResourceParallelCopier copier = new ResourceParallelCopier(ResourceStore.getStore(from), ResourceStore.getStore(to)); - if (parallelCopyGroupSize > 0) + if (parallelCopyGroupSize > 0) { copier.setGroupSize(parallelCopyGroupSize); + } Stats stats = copier.copy(folder, includes, excludes, new Stats() { @@ -178,10 +180,12 @@ public class ResourceTool { }); if (stats.hasError()) { - for (String errGroup : stats.errorGroups) + for (String errGroup : stats.errorGroups) { System.out.println("Failed to copy resource group: " + errGroup + "*"); - for (String errResPath : stats.errorResourcePaths) + } + for (String errResPath : stats.errorResourcePaths) { System.out.println("Failed to copy resource: " + errResPath); + } throw new IOException("Failed to copy " + stats.errorResource.get() + " resource"); } } diff --git a/core-common/src/main/resources/metadata-jdbc-postgresql.properties b/core-common/src/main/resources/metadata-jdbc-postgresql.properties new file mode 100644 index 0000000000..a7863f333e --- /dev/null +++ b/core-common/src/main/resources/metadata-jdbc-postgresql.properties @@ -0,0 +1,33 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +###JDBC METASTORE +format.sql.create-if-need=create table if not exists {0} ( {1} VARCHAR(255) primary key, {2} BIGINT, {3} BYTEA ) +format.sql.key-equals=select {0} from {1} where {2} = ? +format.sql.delete-pstat=delete from {0} where {1} = ? +format.sql.list-resource=select {0} from {1} where {2} like ? +format.sql.all-resource=select {0} from {1} where {2} like ? escape ''#'' and {3} >= ? and {4} < ? +format.sql.replace=update {0} set {1} = ?,{2} = ? where {3} = ? +format.sql.insert=insert into {0}({1},{2},{3}) values(?,?,?) +format.sql.replace-without-content=update {0} set {1} = ? where {2} = ? +format.sql.insert-without-content=insert into {0}({1},{2}) values(?,?) +format.sql.update-content-ts=update {0} set {1}=?,{2} = ? where {3}=? and {4}=? +format.sql.test.create=create table if not exists {0} (name VARCHAR(255) primary key, id BIGINT) +format.sql.test.drop=drop table if exists {0} +format.sql.create-index=create index {0} on {1} ({2}) +format.sql.check-table-exists=SELECT table_name FROM information_schema.tables \ + WHERE table_schema='''public''' AND table_type='''BASE TABLE''' AND table_name=''{0}''; diff --git a/pom.xml b/pom.xml index ac02fb639f..bb0d8cca22 100644 --- a/pom.xml +++ b/pom.xml @@ -89,6 +89,9 @@ <!-- mysql versions --> <mysql-connector.version>5.1.8</mysql-connector.version> + <!-- postgre SQL versions --> + <postgresql.version>42.3.3</postgresql.version> + <!-- Scala versions --> <scala.version>2.12.10</scala.version> <scala.binary.version>2.12</scala.binary.version> @@ -593,6 +596,11 @@ <version>${mysql-connector.version}</version> <scope>provided</scope> </dependency> + <dependency> + <groupId>org.postgresql</groupId> + <artifactId>postgresql</artifactId> + <version>${postgresql.version}</version> + </dependency> <!-- Hive dependencies --> <dependency> <groupId>org.apache.hive</groupId> @@ -1837,6 +1845,7 @@ </plugins> </build> </profile> + <profile> <id>m2e-only</id> <activation>