This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch kylin5 in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 7973ad7139f018294b76cf690e52f4e4df107d50 Author: jlf <longfei.ji...@kyligence.io> AuthorDate: Thu Apr 27 20:41:05 2023 +0800 KYLIN-5647 add put hive_1_2_2 to HDFS before KE start (#30313) --- .../org/apache/kylin/common/KylinConfigBase.java | 8 ++ .../org/apache/kylin/common/util/FileUtils.java | 9 +- .../apache/kylin/tool/hive/HiveClientJarTool.java | 104 +++++++++++++++++ .../kylin/tool/hive/HiveClientJarToolTest.java | 130 +++++++++++++++++++++ .../kylin/tool/hive/HiveClientJarToolTestBase.java | 77 ++++++++++++ .../HiveClientJarToolWithoutSparkHiveDirTest.java | 96 +++++++++++++++ 6 files changed, 423 insertions(+), 1 deletion(-) diff --git a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index 20cfa46a6c..ac7d894ee4 100644 --- a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -1520,6 +1520,14 @@ public abstract class KylinConfigBase implements Serializable { return getOptional("kylin.engine.spark-conf.spark.submit.deployMode", "client").toLowerCase(Locale.ROOT); } + public String getSparkSqlHiveMetastoreJarsPath() { + return getOptional("kylin.engine.spark-conf.spark.sql.hive.metastore.jars.path", ""); + } + + public boolean getHiveClientJarUploadEnable() { + return Boolean.parseBoolean(getOptional("kylin.engine.hive-client-jar-upload.enable", FALSE)); + } + public String getSparkBuildClassName() { return getOptional("kylin.engine.spark.build-class-name", "org.apache.kylin.engine.spark.job.SegmentBuildJob"); } diff --git a/src/core-common/src/main/java/org/apache/kylin/common/util/FileUtils.java b/src/core-common/src/main/java/org/apache/kylin/common/util/FileUtils.java index 9e9c4af186..a817726fc7 100644 --- a/src/core-common/src/main/java/org/apache/kylin/common/util/FileUtils.java +++ b/src/core-common/src/main/java/org/apache/kylin/common/util/FileUtils.java @@ -31,7 +31,6 @@ import java.util.Map; import java.util.stream.Collectors; import org.apache.commons.codec.binary.Base64; - import org.apache.kylin.guava30.shaded.common.base.Preconditions; import org.apache.kylin.guava30.shaded.common.collect.Lists; @@ -57,6 +56,14 @@ public final class FileUtils { return Lists.newArrayList(); } + public static List<File> findFiles(String dir) { + File[] files = new File(dir).listFiles(); + if (files != null) { + return Lists.newArrayList(files); + } + return Lists.newArrayList(); + } + public static Map<String, String> readFromPropertiesFile(File file) { try (FileInputStream fileInputStream = new FileInputStream(file)) { return readFromPropertiesFile(fileInputStream); diff --git a/src/tool/src/main/java/org/apache/kylin/tool/hive/HiveClientJarTool.java b/src/tool/src/main/java/org/apache/kylin/tool/hive/HiveClientJarTool.java new file mode 100644 index 0000000000..e2bc30b59b --- /dev/null +++ b/src/tool/src/main/java/org/apache/kylin/tool/hive/HiveClientJarTool.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.tool.hive; + +import java.io.File; +import java.io.IOException; + +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.FileUtils; +import org.apache.kylin.common.util.HadoopUtil; +import org.apache.kylin.common.util.Unsafe; +import org.apache.kylin.tool.util.ToolMainWrapper; + +import lombok.val; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class HiveClientJarTool { + public static void main(String[] args) { + ToolMainWrapper.wrap(args, () -> { + HiveClientJarTool tool = new HiveClientJarTool(); + tool.execute(); + }); + Unsafe.systemExit(0); + } + + public void execute() throws IOException { + val config = KylinConfig.getInstanceFromEnv(); + if (!config.getHiveClientJarUploadEnable()) { + log.info("Not need upload hive client jar"); + return; + } + if (StringUtils.isBlank(config.getSparkSqlHiveMetastoreJarsPath())) { + log.warn("kylin.engine.spark-conf.spark.sql.hive.metastore.jars.path not setting"); + return; + } + + // In Read/Write Separation Deployment, clusters are cross-domain mutual trust + // can use ReadClusterFileSystem to access WriteClusterFileSystem + val fileSystem = HadoopUtil.getWorkingFileSystem(); + val sparkSqlHiveMetastoreJarsPath = config.getSparkSqlHiveMetastoreJarsPath(); + val jarsDirPath = new Path(sparkSqlHiveMetastoreJarsPath).getParent(); + val uploadHiveJarFlag = new Path(jarsDirPath, "_upload_hive_jar_by_pass"); + if (fileSystem.exists(uploadHiveJarFlag)) { + log.info("Not need upload Spark HIVE jars again"); + return; + } + + val kylinSparkHiveJarsPath = getKylinSparkHiveJarsPath(); + if (StringUtils.isBlank(kylinSparkHiveJarsPath)) { + log.warn("${KYLIN_HOME}/spark/hive_1_2_2 needs to be an existing directory"); + return; + } + + uploadHiveJars(fileSystem, uploadHiveJarFlag, kylinSparkHiveJarsPath, jarsDirPath); + } + + public void uploadHiveJars(FileSystem fileSystem, Path uploadHiveJarFlag, String kylinSparkHiveJarsPath, + Path jarsDirPath) throws IOException { + if (fileSystem.exists(jarsDirPath)) { + log.warn("HDFS dir [{}] exist, not upload hive client jar", jarsDirPath); + return; + } + fileSystem.mkdirs(jarsDirPath); + val hiveJars = FileUtils.findFiles(kylinSparkHiveJarsPath); + for (File jar : hiveJars) { + val sparkHiveJarPath = new Path(jar.getCanonicalPath()); + fileSystem.copyFromLocalFile(sparkHiveJarPath, jarsDirPath); + } + log.info("Upload Spark HIVE jars ending"); + try (val out = fileSystem.create(uploadHiveJarFlag, true)) { + out.write(new byte[] {}); + } + log.info("Upload Spark HIVE jars success"); + } + + public String getKylinSparkHiveJarsPath() throws IOException { + String sparkHome = KylinConfig.getSparkHome(); + val jar = FileUtils.findFile(sparkHome, "hive_1_2_2"); + if (jar == null || jar.isFile()) { + return ""; + } + return jar.getCanonicalPath(); + } +} diff --git a/src/tool/src/test/java/org/apache/kylin/tool/hive/HiveClientJarToolTest.java b/src/tool/src/test/java/org/apache/kylin/tool/hive/HiveClientJarToolTest.java new file mode 100644 index 0000000000..106efd6fc1 --- /dev/null +++ b/src/tool/src/test/java/org/apache/kylin/tool/hive/HiveClientJarToolTest.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.tool.hive; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.stream.Collectors; + +import org.apache.hadoop.fs.Path; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.HadoopUtil; +import org.apache.kylin.junit.annotation.MetadataInfo; +import org.apache.logging.log4j.Level; +import org.apache.logging.log4j.core.LogEvent; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; + +import lombok.val; + +@MetadataInfo +class HiveClientJarToolTest extends HiveClientJarToolTestBase { + + @BeforeEach + public void before() throws IOException { + super.before(); + val sparkHome = KylinConfig.getSparkHome(); + val sparkPath = Paths.get(sparkHome); + val hive122 = sparkHome + File.separator + "hive_1_2_2"; + val hive122Path = Paths.get(hive122); + val jar = hive122 + File.separator + "test.jar"; + val jarPath = Paths.get(jar); + Files.createDirectories(hive122Path); + Files.createFile(jarPath); + } + + @AfterEach + public void after() throws IOException { + super.after(); + val sparkHome = KylinConfig.getSparkHome(); + val sparkPath = Paths.get(sparkHome); + val hive122 = sparkHome + File.separator + "hive_1_2_2"; + val hive122Path = Paths.get(hive122); + val jar = hive122 + File.separator + "test.jar"; + val jarPath = Paths.get(jar); + Files.deleteIfExists(jarPath); + Files.deleteIfExists(hive122Path); + Files.deleteIfExists(sparkPath); + } + + @Test + void uploadHiveJars() throws IOException { + uploadHiveJars(false); + uploadHiveJars(true); + } + + void uploadHiveJars(boolean exist) throws IOException { + val config = KylinConfig.getInstanceFromEnv(); + val fileSystem = HadoopUtil.getWorkingFileSystem(); + val hive = new Path(config.getHdfsWorkingDirectory(), "hive"); + val uploadHiveJarFlag = new Path(config.getHdfsWorkingDirectory(), "upload_hive_jar"); + try { + config.setProperty("kylin.engine.spark-conf.spark.sql.hive.metastore.jars.path", + config.getHdfsWorkingDirectory() + "hive/*"); + + if (exist) { + fileSystem.mkdirs(hive); + } + val kylinSparkHiveJarsPath = uploadHiveJarsTool.getKylinSparkHiveJarsPath(); + val sparkSqlHiveMetastoreJarsPath = config.getSparkSqlHiveMetastoreJarsPath(); + val jarsDirPath = new Path(sparkSqlHiveMetastoreJarsPath).getParent(); + uploadHiveJarsTool.uploadHiveJars(fileSystem, uploadHiveJarFlag, kylinSparkHiveJarsPath, jarsDirPath); + + if (exist) { + ArgumentCaptor<LogEvent> logCaptor = ArgumentCaptor.forClass(LogEvent.class); + Mockito.verify(appender, Mockito.atLeast(0)).append(logCaptor.capture()); + val logs = logCaptor.getAllValues().stream() + .filter(event -> event.getLoggerName().equals("org.apache.kylin.tool.hive.HiveClientJarTool")) + .filter(event -> event.getLevel().equals(Level.WARN)) + .map(event -> event.getMessage().getFormattedMessage()).collect(Collectors.toList()); + val logCount = logs.stream() + .filter(log -> log.equals("HDFS dir [" + hive + "] exist, not upload hive client jar")).count(); + assertEquals(1, logCount); + } else { + val hdfsHiveJarsPath = new Path(config.getSparkSqlHiveMetastoreJarsPath().substring(0, + config.getSparkSqlHiveMetastoreJarsPath().length() - 1)); + val fileStatuses = HadoopUtil.getWorkingFileSystem().listStatus(hdfsHiveJarsPath); + assertEquals(1, fileStatuses.length); + val hdfsJarPath = fileStatuses[0].getPath(); + assertEquals(hdfsHiveJarsPath + "/test.jar", hdfsJarPath.toString()); + } + assertEquals(!exist, fileSystem.exists(uploadHiveJarFlag)); + } finally { + config.setProperty("kylin.engine.spark-conf.spark.sql.hive.metastore.jars.path", ""); + fileSystem.delete(uploadHiveJarFlag, true); + fileSystem.delete(hive, true); + } + } + + @Test + void testExecute() throws IOException { + testExecute(true, false, "kylin.engine.spark-conf.spark.sql.hive.metastore.jars.path not setting"); + testExecute(true, true, "Upload Spark HIVE jars success"); + testExecute(true, true, "Not need upload Spark HIVE jars again"); + testExecute(false, false, "Not need upload hive client jar"); + } + +} \ No newline at end of file diff --git a/src/tool/src/test/java/org/apache/kylin/tool/hive/HiveClientJarToolTestBase.java b/src/tool/src/test/java/org/apache/kylin/tool/hive/HiveClientJarToolTestBase.java new file mode 100644 index 0000000000..c7302ab22d --- /dev/null +++ b/src/tool/src/test/java/org/apache/kylin/tool/hive/HiveClientJarToolTestBase.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.tool.hive; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.io.IOException; +import java.util.stream.Collectors; + +import org.apache.kylin.common.KylinConfig; +import org.apache.logging.log4j.Level; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.core.Appender; +import org.apache.logging.log4j.core.LogEvent; +import org.apache.logging.log4j.core.Logger; +import org.mockito.ArgumentCaptor; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.Mockito; + +import lombok.val; + +public class HiveClientJarToolTestBase { + @InjectMocks + protected HiveClientJarTool uploadHiveJarsTool = Mockito.spy(HiveClientJarTool.class); + @Mock + protected Appender appender = Mockito.mock(Appender.class); + + protected void before() throws IOException { + Mockito.when(appender.getName()).thenReturn("mocked"); + Mockito.when(appender.isStarted()).thenReturn(true); + ((Logger) LogManager.getRootLogger()).addAppender(appender); + } + + protected void after() throws IOException { + ((Logger) LogManager.getRootLogger()).removeAppender(appender); + } + + protected void testExecute(boolean upload, boolean setJarsPath, String message) throws IOException { + val config = KylinConfig.getInstanceFromEnv(); + try { + if (setJarsPath) { + config.setProperty("kylin.engine.spark-conf.spark.sql.hive.metastore.jars.path", + config.getHdfsWorkingDirectory() + "hive/*"); + } + config.setProperty("kylin.engine.hive-client-jar-upload.enable", String.valueOf(upload)); + uploadHiveJarsTool.execute(); + ArgumentCaptor<LogEvent> logCaptor = ArgumentCaptor.forClass(LogEvent.class); + Mockito.verify(appender, Mockito.atLeast(0)).append(logCaptor.capture()); + val logs = logCaptor.getAllValues().stream() + .filter(event -> event.getLoggerName().equals("org.apache.kylin.tool.hive.HiveClientJarTool")) + .filter(event -> event.getLevel().equals(Level.INFO) || event.getLevel().equals(Level.WARN)) + .map(event -> event.getMessage().getFormattedMessage()).collect(Collectors.toList()); + val logCount = logs.stream().filter(log -> log.equals(message)).count(); + assertEquals(1, logCount); + } finally { + config.setProperty("kylin.engine.spark-conf.spark.sql.hive.metastore.jars.path", ""); + config.setProperty("kylin.engine.hive-client-jar-upload.enable", "false"); + } + } +} diff --git a/src/tool/src/test/java/org/apache/kylin/tool/hive/HiveClientJarToolWithoutSparkHiveDirTest.java b/src/tool/src/test/java/org/apache/kylin/tool/hive/HiveClientJarToolWithoutSparkHiveDirTest.java new file mode 100644 index 0000000000..f0d5595930 --- /dev/null +++ b/src/tool/src/test/java/org/apache/kylin/tool/hive/HiveClientJarToolWithoutSparkHiveDirTest.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.tool.hive; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Paths; + +import org.apache.commons.lang3.StringUtils; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.junit.annotation.MetadataInfo; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import lombok.val; +import lombok.var; + +@MetadataInfo +class HiveClientJarToolWithoutSparkHiveDirTest extends HiveClientJarToolTestBase { + @BeforeEach + public void before() throws IOException { + super.before(); + } + + @AfterEach + public void after() throws IOException { + super.after(); + val sparkHome = KylinConfig.getSparkHome(); + val sparkPath = Paths.get(sparkHome); + val hive122 = sparkHome + File.separator + "hive_1_2_2"; + val hive122Path = Paths.get(hive122); + Files.deleteIfExists(hive122Path); + Files.deleteIfExists(sparkPath); + } + + @Test + void getKylinSparkHiveJarsPath() throws IOException { + val sparkHome = KylinConfig.getSparkHome(); + val sparkPath = Paths.get(sparkHome); + val hive122 = sparkHome + File.separator + "hive_1_2_2"; + val hive122Path = Paths.get(hive122); + try { + var kylinSparkHiveJarsPath = uploadHiveJarsTool.getKylinSparkHiveJarsPath(); + assertTrue(StringUtils.isBlank(kylinSparkHiveJarsPath)); + Files.createDirectory(sparkPath); + Files.createFile(hive122Path); + kylinSparkHiveJarsPath = uploadHiveJarsTool.getKylinSparkHiveJarsPath(); + assertTrue(StringUtils.isBlank(kylinSparkHiveJarsPath)); + + Files.deleteIfExists(hive122Path); + Files.createDirectory(hive122Path); + kylinSparkHiveJarsPath = uploadHiveJarsTool.getKylinSparkHiveJarsPath(); + assertEquals(new File(hive122).getCanonicalPath(), kylinSparkHiveJarsPath); + } finally { + Files.deleteIfExists(hive122Path); + Files.deleteIfExists(sparkPath); + } + } + + @Test + void testExecute() throws IOException { + testExecute(true, true, "${KYLIN_HOME}/spark/hive_1_2_2 needs to be an existing directory"); + } + + @Test + void testExecuteWithHive122File() throws IOException { + val sparkHome = KylinConfig.getSparkHome(); + val sparkPath = Paths.get(sparkHome); + val hive122 = sparkHome + File.separator + "hive_1_2_2"; + val hive122Path = Paths.get(hive122); + Files.createDirectories(sparkPath); + Files.createFile(hive122Path); + testExecute(true, true, "${KYLIN_HOME}/spark/hive_1_2_2 needs to be an existing directory"); + } +}