This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 7973ad7139f018294b76cf690e52f4e4df107d50
Author: jlf <longfei.ji...@kyligence.io>
AuthorDate: Thu Apr 27 20:41:05 2023 +0800

    KYLIN-5647 add put hive_1_2_2 to HDFS before KE start (#30313)
---
 .../org/apache/kylin/common/KylinConfigBase.java   |   8 ++
 .../org/apache/kylin/common/util/FileUtils.java    |   9 +-
 .../apache/kylin/tool/hive/HiveClientJarTool.java  | 104 +++++++++++++++++
 .../kylin/tool/hive/HiveClientJarToolTest.java     | 130 +++++++++++++++++++++
 .../kylin/tool/hive/HiveClientJarToolTestBase.java |  77 ++++++++++++
 .../HiveClientJarToolWithoutSparkHiveDirTest.java  |  96 +++++++++++++++
 6 files changed, 423 insertions(+), 1 deletion(-)

diff --git 
a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java 
b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 20cfa46a6c..ac7d894ee4 100644
--- a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -1520,6 +1520,14 @@ public abstract class KylinConfigBase implements 
Serializable {
         return getOptional("kylin.engine.spark-conf.spark.submit.deployMode", 
"client").toLowerCase(Locale.ROOT);
     }
 
+    public String getSparkSqlHiveMetastoreJarsPath() {
+        return 
getOptional("kylin.engine.spark-conf.spark.sql.hive.metastore.jars.path", "");
+    }
+
+    public boolean getHiveClientJarUploadEnable() {
+        return 
Boolean.parseBoolean(getOptional("kylin.engine.hive-client-jar-upload.enable", 
FALSE));
+    }
+
     public String getSparkBuildClassName() {
         return getOptional("kylin.engine.spark.build-class-name", 
"org.apache.kylin.engine.spark.job.SegmentBuildJob");
     }
diff --git 
a/src/core-common/src/main/java/org/apache/kylin/common/util/FileUtils.java 
b/src/core-common/src/main/java/org/apache/kylin/common/util/FileUtils.java
index 9e9c4af186..a817726fc7 100644
--- a/src/core-common/src/main/java/org/apache/kylin/common/util/FileUtils.java
+++ b/src/core-common/src/main/java/org/apache/kylin/common/util/FileUtils.java
@@ -31,7 +31,6 @@ import java.util.Map;
 import java.util.stream.Collectors;
 
 import org.apache.commons.codec.binary.Base64;
-
 import org.apache.kylin.guava30.shaded.common.base.Preconditions;
 import org.apache.kylin.guava30.shaded.common.collect.Lists;
 
@@ -57,6 +56,14 @@ public final class FileUtils {
         return Lists.newArrayList();
     }
 
+    public static List<File> findFiles(String dir) {
+        File[] files = new File(dir).listFiles();
+        if (files != null) {
+            return Lists.newArrayList(files);
+        }
+        return Lists.newArrayList();
+    }
+
     public static Map<String, String> readFromPropertiesFile(File file) {
         try (FileInputStream fileInputStream = new FileInputStream(file)) {
             return readFromPropertiesFile(fileInputStream);
diff --git 
a/src/tool/src/main/java/org/apache/kylin/tool/hive/HiveClientJarTool.java 
b/src/tool/src/main/java/org/apache/kylin/tool/hive/HiveClientJarTool.java
new file mode 100644
index 0000000000..e2bc30b59b
--- /dev/null
+++ b/src/tool/src/main/java/org/apache/kylin/tool/hive/HiveClientJarTool.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.tool.hive;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.FileUtils;
+import org.apache.kylin.common.util.HadoopUtil;
+import org.apache.kylin.common.util.Unsafe;
+import org.apache.kylin.tool.util.ToolMainWrapper;
+
+import lombok.val;
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class HiveClientJarTool {
+    public static void main(String[] args) {
+        ToolMainWrapper.wrap(args, () -> {
+            HiveClientJarTool tool = new HiveClientJarTool();
+            tool.execute();
+        });
+        Unsafe.systemExit(0);
+    }
+
+    public void execute() throws IOException {
+        val config = KylinConfig.getInstanceFromEnv();
+        if (!config.getHiveClientJarUploadEnable()) {
+            log.info("Not need upload hive client jar");
+            return;
+        }
+        if (StringUtils.isBlank(config.getSparkSqlHiveMetastoreJarsPath())) {
+            
log.warn("kylin.engine.spark-conf.spark.sql.hive.metastore.jars.path not 
setting");
+            return;
+        }
+
+        // In Read/Write Separation Deployment, clusters are cross-domain 
mutual trust
+        // can use ReadClusterFileSystem to access WriteClusterFileSystem
+        val fileSystem = HadoopUtil.getWorkingFileSystem();
+        val sparkSqlHiveMetastoreJarsPath = 
config.getSparkSqlHiveMetastoreJarsPath();
+        val jarsDirPath = new Path(sparkSqlHiveMetastoreJarsPath).getParent();
+        val uploadHiveJarFlag = new Path(jarsDirPath, 
"_upload_hive_jar_by_pass");
+        if (fileSystem.exists(uploadHiveJarFlag)) {
+            log.info("Not need upload Spark HIVE jars again");
+            return;
+        }
+
+        val kylinSparkHiveJarsPath = getKylinSparkHiveJarsPath();
+        if (StringUtils.isBlank(kylinSparkHiveJarsPath)) {
+            log.warn("${KYLIN_HOME}/spark/hive_1_2_2 needs to be an existing 
directory");
+            return;
+        }
+
+        uploadHiveJars(fileSystem, uploadHiveJarFlag, kylinSparkHiveJarsPath, 
jarsDirPath);
+    }
+
+    public void uploadHiveJars(FileSystem fileSystem, Path uploadHiveJarFlag, 
String kylinSparkHiveJarsPath,
+            Path jarsDirPath) throws IOException {
+        if (fileSystem.exists(jarsDirPath)) {
+            log.warn("HDFS dir [{}] exist, not upload hive client jar", 
jarsDirPath);
+            return;
+        }
+        fileSystem.mkdirs(jarsDirPath);
+        val hiveJars = FileUtils.findFiles(kylinSparkHiveJarsPath);
+        for (File jar : hiveJars) {
+            val sparkHiveJarPath = new Path(jar.getCanonicalPath());
+            fileSystem.copyFromLocalFile(sparkHiveJarPath, jarsDirPath);
+        }
+        log.info("Upload Spark HIVE jars ending");
+        try (val out = fileSystem.create(uploadHiveJarFlag, true)) {
+            out.write(new byte[] {});
+        }
+        log.info("Upload Spark HIVE jars success");
+    }
+
+    public String getKylinSparkHiveJarsPath() throws IOException {
+        String sparkHome = KylinConfig.getSparkHome();
+        val jar = FileUtils.findFile(sparkHome, "hive_1_2_2");
+        if (jar == null || jar.isFile()) {
+            return "";
+        }
+        return jar.getCanonicalPath();
+    }
+}
diff --git 
a/src/tool/src/test/java/org/apache/kylin/tool/hive/HiveClientJarToolTest.java 
b/src/tool/src/test/java/org/apache/kylin/tool/hive/HiveClientJarToolTest.java
new file mode 100644
index 0000000000..106efd6fc1
--- /dev/null
+++ 
b/src/tool/src/test/java/org/apache/kylin/tool/hive/HiveClientJarToolTest.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.tool.hive;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.stream.Collectors;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.HadoopUtil;
+import org.apache.kylin.junit.annotation.MetadataInfo;
+import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.core.LogEvent;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mockito;
+
+import lombok.val;
+
+@MetadataInfo
+class HiveClientJarToolTest extends HiveClientJarToolTestBase {
+
+    @BeforeEach
+    public void before() throws IOException {
+        super.before();
+        val sparkHome = KylinConfig.getSparkHome();
+        val sparkPath = Paths.get(sparkHome);
+        val hive122 = sparkHome + File.separator + "hive_1_2_2";
+        val hive122Path = Paths.get(hive122);
+        val jar = hive122 + File.separator + "test.jar";
+        val jarPath = Paths.get(jar);
+        Files.createDirectories(hive122Path);
+        Files.createFile(jarPath);
+    }
+
+    @AfterEach
+    public void after() throws IOException {
+        super.after();
+        val sparkHome = KylinConfig.getSparkHome();
+        val sparkPath = Paths.get(sparkHome);
+        val hive122 = sparkHome + File.separator + "hive_1_2_2";
+        val hive122Path = Paths.get(hive122);
+        val jar = hive122 + File.separator + "test.jar";
+        val jarPath = Paths.get(jar);
+        Files.deleteIfExists(jarPath);
+        Files.deleteIfExists(hive122Path);
+        Files.deleteIfExists(sparkPath);
+    }
+
+    @Test
+    void uploadHiveJars() throws IOException {
+        uploadHiveJars(false);
+        uploadHiveJars(true);
+    }
+
+    void uploadHiveJars(boolean exist) throws IOException {
+        val config = KylinConfig.getInstanceFromEnv();
+        val fileSystem = HadoopUtil.getWorkingFileSystem();
+        val hive = new Path(config.getHdfsWorkingDirectory(), "hive");
+        val uploadHiveJarFlag = new Path(config.getHdfsWorkingDirectory(), 
"upload_hive_jar");
+        try {
+            
config.setProperty("kylin.engine.spark-conf.spark.sql.hive.metastore.jars.path",
+                    config.getHdfsWorkingDirectory() + "hive/*");
+
+            if (exist) {
+                fileSystem.mkdirs(hive);
+            }
+            val kylinSparkHiveJarsPath = 
uploadHiveJarsTool.getKylinSparkHiveJarsPath();
+            val sparkSqlHiveMetastoreJarsPath = 
config.getSparkSqlHiveMetastoreJarsPath();
+            val jarsDirPath = new 
Path(sparkSqlHiveMetastoreJarsPath).getParent();
+            uploadHiveJarsTool.uploadHiveJars(fileSystem, uploadHiveJarFlag, 
kylinSparkHiveJarsPath, jarsDirPath);
+
+            if (exist) {
+                ArgumentCaptor<LogEvent> logCaptor = 
ArgumentCaptor.forClass(LogEvent.class);
+                Mockito.verify(appender, 
Mockito.atLeast(0)).append(logCaptor.capture());
+                val logs = logCaptor.getAllValues().stream()
+                        .filter(event -> 
event.getLoggerName().equals("org.apache.kylin.tool.hive.HiveClientJarTool"))
+                        .filter(event -> event.getLevel().equals(Level.WARN))
+                        .map(event -> 
event.getMessage().getFormattedMessage()).collect(Collectors.toList());
+                val logCount = logs.stream()
+                        .filter(log -> log.equals("HDFS dir [" + hive + "] 
exist, not upload hive client jar")).count();
+                assertEquals(1, logCount);
+            } else {
+                val hdfsHiveJarsPath = new 
Path(config.getSparkSqlHiveMetastoreJarsPath().substring(0,
+                        config.getSparkSqlHiveMetastoreJarsPath().length() - 
1));
+                val fileStatuses = 
HadoopUtil.getWorkingFileSystem().listStatus(hdfsHiveJarsPath);
+                assertEquals(1, fileStatuses.length);
+                val hdfsJarPath = fileStatuses[0].getPath();
+                assertEquals(hdfsHiveJarsPath + "/test.jar", 
hdfsJarPath.toString());
+            }
+            assertEquals(!exist, fileSystem.exists(uploadHiveJarFlag));
+        } finally {
+            
config.setProperty("kylin.engine.spark-conf.spark.sql.hive.metastore.jars.path",
 "");
+            fileSystem.delete(uploadHiveJarFlag, true);
+            fileSystem.delete(hive, true);
+        }
+    }
+
+    @Test
+    void testExecute() throws IOException {
+        testExecute(true, false, 
"kylin.engine.spark-conf.spark.sql.hive.metastore.jars.path not setting");
+        testExecute(true, true, "Upload Spark HIVE jars success");
+        testExecute(true, true, "Not need upload Spark HIVE jars again");
+        testExecute(false, false, "Not need upload hive client jar");
+    }
+
+}
\ No newline at end of file
diff --git 
a/src/tool/src/test/java/org/apache/kylin/tool/hive/HiveClientJarToolTestBase.java
 
b/src/tool/src/test/java/org/apache/kylin/tool/hive/HiveClientJarToolTestBase.java
new file mode 100644
index 0000000000..c7302ab22d
--- /dev/null
+++ 
b/src/tool/src/test/java/org/apache/kylin/tool/hive/HiveClientJarToolTestBase.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.tool.hive;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import java.io.IOException;
+import java.util.stream.Collectors;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.core.Appender;
+import org.apache.logging.log4j.core.LogEvent;
+import org.apache.logging.log4j.core.Logger;
+import org.mockito.ArgumentCaptor;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+
+import lombok.val;
+
+public class HiveClientJarToolTestBase {
+    @InjectMocks
+    protected HiveClientJarTool uploadHiveJarsTool = 
Mockito.spy(HiveClientJarTool.class);
+    @Mock
+    protected Appender appender = Mockito.mock(Appender.class);
+
+    protected void before() throws IOException {
+        Mockito.when(appender.getName()).thenReturn("mocked");
+        Mockito.when(appender.isStarted()).thenReturn(true);
+        ((Logger) LogManager.getRootLogger()).addAppender(appender);
+    }
+
+    protected void after() throws IOException {
+        ((Logger) LogManager.getRootLogger()).removeAppender(appender);
+    }
+
+    protected void testExecute(boolean upload, boolean setJarsPath, String 
message) throws IOException {
+        val config = KylinConfig.getInstanceFromEnv();
+        try {
+            if (setJarsPath) {
+                
config.setProperty("kylin.engine.spark-conf.spark.sql.hive.metastore.jars.path",
+                        config.getHdfsWorkingDirectory() + "hive/*");
+            }
+            config.setProperty("kylin.engine.hive-client-jar-upload.enable", 
String.valueOf(upload));
+            uploadHiveJarsTool.execute();
+            ArgumentCaptor<LogEvent> logCaptor = 
ArgumentCaptor.forClass(LogEvent.class);
+            Mockito.verify(appender, 
Mockito.atLeast(0)).append(logCaptor.capture());
+            val logs = logCaptor.getAllValues().stream()
+                    .filter(event -> 
event.getLoggerName().equals("org.apache.kylin.tool.hive.HiveClientJarTool"))
+                    .filter(event -> event.getLevel().equals(Level.INFO) || 
event.getLevel().equals(Level.WARN))
+                    .map(event -> 
event.getMessage().getFormattedMessage()).collect(Collectors.toList());
+            val logCount = logs.stream().filter(log -> 
log.equals(message)).count();
+            assertEquals(1, logCount);
+        } finally {
+            
config.setProperty("kylin.engine.spark-conf.spark.sql.hive.metastore.jars.path",
 "");
+            config.setProperty("kylin.engine.hive-client-jar-upload.enable", 
"false");
+        }
+    }
+}
diff --git 
a/src/tool/src/test/java/org/apache/kylin/tool/hive/HiveClientJarToolWithoutSparkHiveDirTest.java
 
b/src/tool/src/test/java/org/apache/kylin/tool/hive/HiveClientJarToolWithoutSparkHiveDirTest.java
new file mode 100644
index 0000000000..f0d5595930
--- /dev/null
+++ 
b/src/tool/src/test/java/org/apache/kylin/tool/hive/HiveClientJarToolWithoutSparkHiveDirTest.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.tool.hive;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.junit.annotation.MetadataInfo;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import lombok.val;
+import lombok.var;
+
+@MetadataInfo
+class HiveClientJarToolWithoutSparkHiveDirTest extends 
HiveClientJarToolTestBase {
+    @BeforeEach
+    public void before() throws IOException {
+        super.before();
+    }
+
+    @AfterEach
+    public void after() throws IOException {
+        super.after();
+        val sparkHome = KylinConfig.getSparkHome();
+        val sparkPath = Paths.get(sparkHome);
+        val hive122 = sparkHome + File.separator + "hive_1_2_2";
+        val hive122Path = Paths.get(hive122);
+        Files.deleteIfExists(hive122Path);
+        Files.deleteIfExists(sparkPath);
+    }
+
+    @Test
+    void getKylinSparkHiveJarsPath() throws IOException {
+        val sparkHome = KylinConfig.getSparkHome();
+        val sparkPath = Paths.get(sparkHome);
+        val hive122 = sparkHome + File.separator + "hive_1_2_2";
+        val hive122Path = Paths.get(hive122);
+        try {
+            var kylinSparkHiveJarsPath = 
uploadHiveJarsTool.getKylinSparkHiveJarsPath();
+            assertTrue(StringUtils.isBlank(kylinSparkHiveJarsPath));
+            Files.createDirectory(sparkPath);
+            Files.createFile(hive122Path);
+            kylinSparkHiveJarsPath = 
uploadHiveJarsTool.getKylinSparkHiveJarsPath();
+            assertTrue(StringUtils.isBlank(kylinSparkHiveJarsPath));
+
+            Files.deleteIfExists(hive122Path);
+            Files.createDirectory(hive122Path);
+            kylinSparkHiveJarsPath = 
uploadHiveJarsTool.getKylinSparkHiveJarsPath();
+            assertEquals(new File(hive122).getCanonicalPath(), 
kylinSparkHiveJarsPath);
+        } finally {
+            Files.deleteIfExists(hive122Path);
+            Files.deleteIfExists(sparkPath);
+        }
+    }
+
+    @Test
+    void testExecute() throws IOException {
+        testExecute(true, true, "${KYLIN_HOME}/spark/hive_1_2_2 needs to be an 
existing directory");
+    }
+
+    @Test
+    void testExecuteWithHive122File() throws IOException {
+        val sparkHome = KylinConfig.getSparkHome();
+        val sparkPath = Paths.get(sparkHome);
+        val hive122 = sparkHome + File.separator + "hive_1_2_2";
+        val hive122Path = Paths.get(hive122);
+        Files.createDirectories(sparkPath);
+        Files.createFile(hive122Path);
+        testExecute(true, true, "${KYLIN_HOME}/spark/hive_1_2_2 needs to be an 
existing directory");
+    }
+}

Reply via email to