This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/main by this push: new 5187aff KYLIN-5069 Refactor hive and hadoop dependency of kylin4 5187aff is described below commit 5187affe180fc88a8422cc193097d1ef9614ce89 Author: yaqian.zhang <598593...@qq.com> AuthorDate: Thu Oct 14 11:37:41 2021 +0800 KYLIN-5069 Refactor hive and hadoop dependency of kylin4 --- build/bin/check-hadoop-env.sh | 88 ++++++ build/bin/find-hive-dependency.sh | 214 --------------- build/bin/kylin.sh | 80 +----- build/bin/prepare_hadoop_dependency.sh | 192 +++++++++++++ build/bin/replace-jars-under-spark.sh | 149 ---------- .../org/apache/kylin/common/KylinConfigBase.java | 2 +- .../kylin/common/util/HiveCmdBuilderTest.java | 118 -------- .../kylin/spark/classloader/ClassLoaderUtils.java | 20 -- .../kylin/spark/classloader/SparkClassLoader.java | 212 --------------- .../kylin/spark/classloader/TomcatClassLoader.java | 9 - .../org/apache/spark/sql/SparderContext.scala | 26 +- source-hive/pom.xml | 11 +- .../kylin/source/hive/BeelineHiveClient.java | 300 --------------------- .../apache/kylin/source/hive/CLIHiveClient.java | 191 ------------- .../kylin/source/hive/HiveClientFactory.java | 6 +- .../apache/kylin/source/hive/HiveTableMeta.java | 4 +- .../kylin/source/hive/HiveTableMetaBuilder.java | 4 +- .../apache/kylin/source/hive/SparkHiveClient.java | 131 +++++++++ .../kylin/source/hive/BeelineHIveClientTest.java | 51 ---- 19 files changed, 437 insertions(+), 1371 deletions(-) diff --git a/build/bin/check-hadoop-env.sh b/build/bin/check-hadoop-env.sh new file mode 100644 index 0000000..5c88501 --- /dev/null +++ b/build/bin/check-hadoop-env.sh @@ -0,0 +1,88 @@ +#!/bin/bash + +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +cdh_mapreduce_path="/opt/cloudera/parcels/CDH/lib/hadoop-mapreduce" +hadoop_lib_path="/usr/lib/hadoop" +emr_spark_lib_path="/usr/lib/spark" +hdi3_flag_path="/usr/hdp/current/hdinsight-zookeeper" + +cdh_version=`hadoop version | head -1 | awk -F '-' '{print $2}'` + +function is_cdh_6_x() { + if [ -d ${cdh_mapreduce_path}/../hadoop/ ]; then + hadoop_common_file=`find ${cdh_mapreduce_path}/../hadoop/ -maxdepth 1 -name "hadoop-common-*.jar" -not -name "*test*" | tail -1` + cdh_version=${hadoop_common_file##*-} + + if [[ "${cdh_version}" == cdh6.* ]]; then + echo 1 + return 1 + fi + fi + echo 0 + return 0 +} + +hdp_hadoop_path=$HDP_HADOOP_HOME +if [[ -z ${hdp_hadoop_path} ]] +then + if [[ -d "/usr/hdp/current/hadoop-client" ]]; then + hdp_hadoop_path="/usr/hdp/current/hadoop-client" + fi +fi + +if [ -d $hadoop_lib_path ]; then + # hadoop-common-3.2.1-amzn-0.jar + hadoop_common_file=$(find $hadoop_lib_path -maxdepth 1 -name "hadoop-common-*.jar" -not -name "*test*" | tail -1) + emr_version_1=${hadoop_common_file##*common-} + arrVersion=(${emr_version_1//-/ }) +fi + +function is_aws_emr() { + if [[ "${arrVersion[1]}" == *amzn* ]]; then + echo 1 + return 1 + fi + echo 0 + return 0 +} + +function is_aws_emr_6() { + if [[ "${arrVersion[0]}" == 3.* && "${arrVersion[1]}" == *amzn* ]]; then + echo 1 + return 1 + fi + echo 0 + return 0 +} + +function is_hdi_3_x() { + # get hdp_version + if [ -z "${hdp_version}" ]; then + hdp_version=`/bin/bash -x hadoop 2>&1 | sed -n "s/\(.*\)export HDP_VERSION=\(.*\)/\2/"p` + verbose "hdp_version is ${hdp_version}" + fi + + if [[ -d "/usr/hdp/current/hdinsight-zookeeper" && $hdp_version == "2"* ]];then + echo 1 + return 1 + fi + + echo 0 + return 0 +} \ No newline at end of file diff --git a/build/bin/find-hive-dependency.sh b/build/bin/find-hive-dependency.sh deleted file mode 100755 index 31530e5..0000000 --- a/build/bin/find-hive-dependency.sh +++ /dev/null @@ -1,214 +0,0 @@ -#!/bin/bash - -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -source ${KYLIN_HOME:-"$(cd -P -- "$(dirname -- "$0")" && pwd -P)/../"}/bin/header.sh - -## ${dir} assigned to $KYLIN_HOME/bin in header.sh -source ${dir}/load-hive-conf.sh - -echo Retrieving hive dependency... - -client_mode=`bash ${KYLIN_HOME}/bin/get-properties.sh kylin.source.hive.client` -hive_env= - -if [ "${client_mode}" == "beeline" ] -then - beeline_shell=`$KYLIN_HOME/bin/get-properties.sh kylin.source.hive.beeline-shell` - beeline_params=`bash ${KYLIN_HOME}/bin/get-properties.sh kylin.source.hive.beeline-params` - hive_env=`${beeline_shell} ${hive_conf_properties} ${beeline_params} --outputformat=dsv -e "set;" 2>&1 | grep --text 'env:CLASSPATH' ` -else - source ${dir}/check-hive-usability.sh - hive_env=`hive ${hive_conf_properties} -e set 2>&1 | grep 'env:CLASSPATH'` -fi - -if [ -z $hive_env ] -then - hive_permission=`hive ${hive_conf_properties} -e set 2>&1 | grep 'No valid credentials provided'` - if [ -n "$hive_permission" ] - then - quit "No valid credentials provided for Hive CLI, please check permission of hive. (e.g. check if Kerberos is expired or not)" - else - quit "Something wrong with Hive CLI or Beeline, please execute Hive CLI or Beeline CLI in terminal to find the root cause." - fi -fi - -hive_classpath=`echo $hive_env | grep 'env:CLASSPATH' | awk -F '=' '{print $2}'` -arr=(`echo $hive_classpath | cut -d ":" -f 1- | sed 's/:/ /g'`) -hive_conf_path= -hive_exec_path= - -if [ -n "$HIVE_CONF" ] -then - verbose "HIVE_CONF is set to: $HIVE_CONF, use it to locate hive configurations." - hive_conf_path=$HIVE_CONF -fi - -for data in ${arr[@]} -do - result=`echo $data | grep -e 'hive-exec[a-z0-9A-Z\.-]*.jar' | grep -v 'auxlib'` - # In some cases there are more than one lib dirs, only the first one will be applied. - if [ $result ] && [ -z "$hive_exec_path" ] - then - hive_exec_path=$data - fi - - # in some versions of hive config is not in hive's classpath, find it separately - if [ -z "$hive_conf_path" ] - then - result=`echo $data | grep -e 'hive[^/]*/conf'` - if [ $result ] - then - hive_conf_path=$data - fi - fi -done - -if [ -z "$hive_conf_path" ] -then - quit "Couldn't find hive configuration directory. Please set HIVE_CONF to the path which contains hive-site.xml." -fi - -if [ -z "$hive_exec_path" ] -then - quit "Couldn't find hive executable jar. Please check if hive executable jar exists in HIVE_LIB folder." -fi - -# in some versions of hive hcatalog is not in hive's classpath, find it separately -if [ -z "$HCAT_HOME" ] -then - verbose "HCAT_HOME not found, try to find hcatalog path from hadoop home" - hadoop_home=`echo $hive_exec_path | awk -F '/hive.*/lib/hive-exec[a-z0-9A-Z.-]*.jar' '{print $1}'` - hive_home=`echo $hive_exec_path | awk -F '/lib/hive-exec[a-z0-9A-Z.-]*.jar' '{print $1}'` - is_aws=`uname -r | grep amzn` - if [ -d "${hadoop_home}/hive-hcatalog" ]; then - hcatalog_home=${hadoop_home}/hive-hcatalog - elif [ -d "${hadoop_home}/hive/hcatalog" ]; then - hcatalog_home=${hadoop_home}/hive/hcatalog - elif [ -d "${hive_home}/hcatalog" ]; then - hcatalog_home=${hive_home}/hcatalog - elif [ -n is_aws ] && [ -d "/usr/lib/hive-hcatalog" ]; then - # special handling for Amazon EMR - hcatalog_home=/usr/lib/hive-hcatalog - else - quit "Couldn't locate hcatalog installation, please make sure it is installed and set HCAT_HOME to the path." - fi -else - verbose "HCAT_HOME is set to: $HCAT_HOME, use it to find hcatalog path:" - hcatalog_home=${HCAT_HOME} -fi - -hcatalog=`find -L ${hcatalog_home} -name "hive-hcatalog-core[0-9\.-]*.jar" 2>&1 | grep -m 1 -v 'Permission denied'` - -if [ -z "$hcatalog" ] -then - quit "hcatalog lib not found" -fi - -function checkFileExist() -{ - msg_hint="" - if [ "$1" == "hive_lib" ] - then - msg_hint=", please check jar files in current HIVE_LIB or export HIVE_LIB='YOUR_LOCAL_HIVE_LIB'" - elif [ "$1" == "hcatalog" ] - then - msg_hint=", please check jar files in current HCAT_HOME or export HCAT_HOME='YOUR_LOCAL_HCAT_HOME'" - fi - - if [ -z "$2" ] - then - if [ "$1" == "hive_lib" ] - then - quit "Current HIVE_LIB is not valid, please export HIVE_LIB='YOUR_LOCAL_HIVE_LIB'" - elif [ "$1" == "hcatalog" ] - then - quit "Current HCAT_HOME is not valid, please export HCAT_HOME='YOUR_LOCAL_HCAT_HOME'" - fi - fi - - files=(`echo $2 | cut -d ":" -f 1- | sed 's/:/ /g'`) - misFiles=0 - outputMissFiles= - for file in ${files} - do - let allFiles++ - if [ ! -f "${file}" ]; then - outputMissFiles=${outputMissFiles}${file}", " - let misFiles++ - fi - done - if [ 0 != ${misFiles} ]; then - times=`expr ${allFiles} / ${misFiles}` - [[ ${times} -gt 10 ]] || quit "A couple of hive jars can't be found: ${outputMisFiles}${msg_hint}" - fi -} - -function validateDirectory() -{ - conf_path=$1 - [[ -d "${conf_path}" ]] || quit "${conf_path} doesn't exist!" - unit=${conf_path: -1} - [[ "${unit}" == "/" ]] || conf_path=${conf_path}"/" - - find="false" - filelist=`ls ${conf_path}` - for file in $filelist - do - if [ "${file}" == "hive-site.xml" ] - then - find="true" - break - fi - done - [[ "${find}" == "true" ]] || quit "ERROR, no hive-site.xml found under dir: ${conf_path}!" -} - -if [ -z "$HIVE_LIB" ] -then - verbose "HIVE_LIB is not set, try to retrieve hive lib from hive_exec_path" - if [[ $hive_exec_path =~ ^\/.*hive.*\/lib\/hive-exec[a-z0-9A-Z\.-]*.jar ]] - then - hive_lib_dir="$(dirname $hive_exec_path)" - else - quit "HIVE_LIB not found, please check hive installation or export HIVE_LIB='YOUR_LOCAL_HIVE_LIB'." - fi -else - if [[ $HIVE_LIB =~ ^\/.*hive.*\/lib[\/]* ]] - then - verbose "HIVE_LIB is set to ${HIVE_LIB}" - else - echo "WARNING: HIVE_LIB is set to ${HIVE_LIB}, it's advised to set it to the lib dir under hive's installation directory" - fi - hive_lib_dir="$HIVE_LIB" -fi - -hive_lib=`find -L ${hive_lib_dir} -name '*.jar' ! -name '*druid*' ! -name '*slf4j*' ! -name '*avatica*' ! -name '*calcite*' \ - ! -name '*jackson-datatype-joda*' ! -name '*derby*' ! -name "*jetty*" ! -name "*jsp*" ! -name "*servlet*" ! -name "*hbase*" ! -name "*websocket*" \ - -printf '%p:' | sed 's/:$//'` - -validateDirectory ${hive_conf_path} -checkFileExist hive_lib ${hive_lib} -checkFileExist hcatalog ${hcatalog} - -hive_dependency=${hive_conf_path}:${hive_lib}:${hcatalog} -verbose "hive dependency is $hive_dependency" -export hive_dependency -export hive_conf_path -echo "export hive_dependency=$hive_dependency -export hive_conf_path=$hive_conf_path" > ${dir}/cached-hive-dependency.sh diff --git a/build/bin/kylin.sh b/build/bin/kylin.sh index a110c8c..a20df10 100755 --- a/build/bin/kylin.sh +++ b/build/bin/kylin.sh @@ -33,11 +33,9 @@ mkdir -p ${KYLIN_HOME}/ext source ${dir}/set-java-home.sh function retrieveDependency() { - #retrive $hive_dependency and $hbase_dependency if [[ -z $reload_dependency && `ls -1 ${dir}/cached-* 2>/dev/null | wc -l` -eq 6 ]] then echo "Using cached dependency..." - source ${dir}/cached-hive-dependency.sh #retrive $hbase_dependency metadataUrl=`${dir}/get-properties.sh kylin.metadata.url` if [[ "${metadataUrl##*@}" == "hbase" ]] @@ -45,11 +43,8 @@ function retrieveDependency() { source ${dir}/cached-hbase-dependency.sh fi source ${dir}/cached-hadoop-conf-dir.sh - # source ${dir}/cached-kafka-dependency.sh source ${dir}/cached-spark-dependency.sh - # source ${dir}/cached-flink-dependency.sh else - source ${dir}/find-hive-dependency.sh #retrive $hbase_dependency metadataUrl=`${dir}/get-properties.sh kylin.metadata.url` if [[ "${metadataUrl##*@}" == "hbase" ]] @@ -57,72 +52,16 @@ function retrieveDependency() { source ${dir}/find-hbase-dependency.sh fi source ${dir}/find-hadoop-conf-dir.sh - # source ${dir}/find-kafka-dependency.sh source ${dir}/find-spark-dependency.sh - # source ${dir}/find-flink-dependency.sh fi - # Replace jars for different hadoop dist - bash ${dir}/replace-jars-under-spark.sh - # get hdp_version if [ -z "${hdp_version}" ]; then hdp_version=`/bin/bash -x hadoop 2>&1 | sed -n "s/\(.*\)export HDP_VERSION=\(.*\)/\2/"p` verbose "hdp_version is ${hdp_version}" fi - # Replace jars for HDI - KYLIN_SPARK_JARS_HOME="${KYLIN_HOME}/spark/jars" - if [[ -d "/usr/hdp/current/hdinsight-zookeeper" && $hdp_version == "2"* ]] - then - echo "The current Hadoop environment is HDI3, will replace some jars package for ${KYLIN_HOME}/spark/jars" - if [[ -f ${KYLIN_HOME}/tomcat/webapps/kylin.war ]] - then - if [[ ! -d ${KYLIN_HOME}/tomcat/webapps/kylin ]] - then - mkdir ${KYLIN_HOME}/tomcat/webapps/kylin - fi - mv ${KYLIN_HOME}/tomcat/webapps/kylin.war ${KYLIN_HOME}/tomcat/webapps/kylin - cd ${KYLIN_HOME}/tomcat/webapps/kylin - jar -xf ${KYLIN_HOME}/tomcat/webapps/kylin/kylin.war - if [[ -f ${KYLIN_HOME}/tomcat/webapps/kylin/WEB-INF/lib/guava-14.0.jar ]] - then - echo "Remove ${KYLIN_HOME}/tomcat/webapps/kylin/WEB-INF/lib/guava-14.0.jar to avoid version conflicts" - rm -rf ${KYLIN_HOME}/tomcat/webapps/kylin/WEB-INF/lib/guava-14.0.jar - rm -rf ${KYLIN_HOME}/tomcat/webapps/kylin/kylin.war - cd ${KYLIN_HOME}/ - fi - fi - - if [[ -d "${KYLIN_SPARK_JARS_HOME}" ]] - then - if [[ -f ${KYLIN_HOME}/hdi3_spark_jars_flag ]] - then - echo "Required jars have been added to ${KYLIN_HOME}/spark/jars, skip this step." - else - rm -rf ${KYLIN_HOME}/spark/jars/hadoop-* - cp /usr/hdp/current/spark2-client/jars/hadoop-* $KYLIN_SPARK_JARS_HOME - cp /usr/hdp/current/spark2-client/jars/azure-* $KYLIN_SPARK_JARS_HOME - cp /usr/hdp/current/hadoop-client/lib/microsoft-log4j-etwappender-1.0.jar $KYLIN_SPARK_JARS_HOME - cp /usr/hdp/current/hadoop-client/lib/hadoop-lzo-0.6.0.${hdp_version}.jar $KYLIN_SPARK_JARS_HOME - - rm -rf $KYLIN_HOME/spark/jars/guava-14.0.1.jar - cp /usr/hdp/current/spark2-client/jars/guava-24.1.1-jre.jar $KYLIN_SPARK_JARS_HOME - - echo "Upload spark jars to HDFS" - hdfs dfs -test -d /spark2_jars - if [ $? -eq 1 ] - then - hdfs dfs -mkdir /spark2_jars - fi - hdfs dfs -put $KYLIN_SPARK_JARS_HOME/* /spark2_jars - - touch ${KYLIN_HOME}/hdi3_spark_jars_flag - fi - else - echo "${KYLIN_HOME}/spark/jars dose not exist. You can run ${KYLIN_HOME}/download-spark.sh to download spark." - fi - fi + source ${KYLIN_HOME}/bin/prepare_hadoop_dependency.sh tomcat_root=${dir}/../tomcat export tomcat_root @@ -141,26 +80,15 @@ function retrieveDependency() { spring_profile="${spring_profile},${additional_security_profiles}" fi - # compose hadoop_dependencies - hadoop_dependencies=${hadoop_dependencies}:`hadoop classpath` - if [ -n "${hive_dependency}" ]; then - hadoop_dependencies=${hive_dependency}:${hadoop_dependencies} - fi - if [ -n "${kafka_dependency}" ]; then - hadoop_dependencies=${hadoop_dependencies}:${kafka_dependency} - fi - # compose KYLIN_TOMCAT_CLASSPATH tomcat_classpath=${tomcat_root}/bin/bootstrap.jar:${tomcat_root}/bin/tomcat-juli.jar:${tomcat_root}/lib/* - export KYLIN_TOMCAT_CLASSPATH=${tomcat_classpath}:${KYLIN_HOME}/conf:${KYLIN_HOME}/lib/*:${KYLIN_HOME}/ext/*:${hadoop_dependencies}:${flink_dependency} + export KYLIN_TOMCAT_CLASSPATH=${tomcat_classpath}:${KYLIN_HOME}/conf:${KYLIN_HOME}/hadoop_conf:${KYLIN_HOME}/lib/*:${KYLIN_HOME}/ext/*:${SPARK_HOME}/jars/* # compose KYLIN_TOOL_CLASSPATH - export KYLIN_TOOL_CLASSPATH=${KYLIN_HOME}/conf:${KYLIN_HOME}/tool/*:${KYLIN_HOME}/ext/*:${hadoop_dependencies} + export KYLIN_TOOL_CLASSPATH=${KYLIN_HOME}/conf:${KYLIN_HOME}/tool/*:${KYLIN_HOME}/ext/*:${SPARK_HOME}/jars/* # compose kylin_common_opts - kylin_common_opts="-Dkylin.hive.dependency=${hive_dependency} \ - -Dkylin.kafka.dependency=${kafka_dependency} \ - -Dkylin.hadoop.conf.dir=${kylin_hadoop_conf_dir} \ + kylin_common_opts="-Dkylin.hadoop.conf.dir=${kylin_hadoop_conf_dir} \ -Dkylin.server.host-address=${KYLIN_REST_ADDRESS} \ -Dspring.profiles.active=${spring_profile} \ -Dhdp.version=${hdp_version}" diff --git a/build/bin/prepare_hadoop_dependency.sh b/build/bin/prepare_hadoop_dependency.sh new file mode 100644 index 0000000..4ef1f30 --- /dev/null +++ b/build/bin/prepare_hadoop_dependency.sh @@ -0,0 +1,192 @@ +#!/bin/bash + +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +source ${KYLIN_HOME}/bin/check-hadoop-env.sh + + +BYPASS=${SPARK_HOME}/jars/replace-jars-bypass + +if [[ -f ${BYPASS} ]] +then + return +fi + +if [ ! -d "$KYLIN_HOME/spark" ]; then + echo "Skip spark which not owned by kylin. SPARK_HOME is $SPARK_HOME and KYLIN_HOME is $KYLIN_HOME ." + exit 0 +fi + +echo "Start replace hadoop jars under ${KYLIN_HOME}/spark/jars." + +hadoop_lib=${KYLIN_HOME}/spark/jars + +common_jars= +hdfs_jars= +mr_jars= +yarn_jars= +other_jars= + +function cdh_replace_jars() { + common_jars=$(find $cdh_mapreduce_path/../hadoop -maxdepth 2 \ + -name "hadoop-annotations-*.jar" -not -name "*test*" \ + -o -name "hadoop-auth-*.jar" -not -name "*test*" \ + -o -name "hadoop-common-*.jar" -not -name "*test*") + + hdfs_jars=$(find $cdh_mapreduce_path/../hadoop-hdfs -maxdepth 1 -name "hadoop-hdfs-*" -not -name "*test*" -not -name "*nfs*") + + mr_jars=$(find $cdh_mapreduce_path -maxdepth 1 \ + -name "hadoop-mapreduce-client-app-*.jar" -not -name "*test*" \ + -o -name "hadoop-mapreduce-client-common-*.jar" -not -name "*test*" \ + -o -name "hadoop-mapreduce-client-jobclient-*.jar" -not -name "*test*" \ + -o -name "hadoop-mapreduce-client-shuffle-*.jar" -not -name "*test*" \ + -o -name "hadoop-mapreduce-client-core-*.jar" -not -name "*test*") + + yarn_jars=$(find $cdh_mapreduce_path/../hadoop-yarn -maxdepth 1 \ + -name "hadoop-yarn-api-*.jar" -not -name "*test*" \ + -o -name "hadoop-yarn-client-*.jar" -not -name "*test*" \ + -o -name "hadoop-yarn-common-*.jar" -not -name "*test*" \ + -o -name "hadoop-yarn-server-common-*.jar" -not -name "*test*" \ + -o -name "hadoop-yarn-server-web-proxy-*.jar" -not -name "*test*") + + other_jars=$(find $cdh_mapreduce_path/../../jars -maxdepth 1 -name "htrace-core4*" || find $cdh_mapreduce_path/../hadoop -maxdepth 2 -name "htrace-core4*") + + if [[ $(is_cdh_6_x) == 1 ]]; then + cdh6_jars=$(find ${cdh_mapreduce_path}/../../jars -maxdepth 1 \ + -name "woodstox-core-*.jar" -o -name "stax2-*.jar" -o -name "commons-configuration2-*.jar" -o -name "re2j-*.jar" ) + fi +} + +function emr_replace_jars() { + common_jars=$(find ${emr_spark_lib_path}/jars/ -maxdepth 1 \ + -name "hadoop-*.jar" -not -name "*test*" \ + -o -name "htrace-core4*" \ + -o -name "emr-spark-goodies*") + + other_jars=$(find ${hadoop_lib_path}/lib/ -maxdepth 1 \ + -name "woodstox-core-*.jar" \ + -o -name "stax2-api-3*.jar") + + lzo_jars=$(find ${hadoop_lib_path}/../hadoop-lzo/lib/ -maxdepth 1 \ + -name "hadoop-lzo-*.jar" ) + + if [[ $(is_aws_emr_6) == 1 ]]; then + emr6_jars=$(find ${emr_spark_lib_path}/jars/ -maxdepth 1 \ + -name "re2j-*.jar" -not -name "*test*" \ + -o -name "commons-configuration2-*" ) + fi +} + +function hdi_replace_jars() { + common_jars=$(find ${hdi3_flag_path}/../spark2-client/ -maxdepth 2 \ + -name "hadoop-*.jar" -not -name "*test*" \ + -o -name "azure-*.jar" -not -name "*test*" \ + -o -name "guava-*.jar") + + other_jars=$(find ${hdi3_flag_path}/../hadoop-client/ -maxdepth 2 \ + -name "microsoft-log4j-etwappender-*.jar") + + lzo_jars=$(find ${hdi3_flag_path}/../hadoop-client/ -maxdepth 2 \ + -name "hadoop-lzo-*.jar" ) +} + +if [ -d "$cdh_mapreduce_path" ] +then + cdh_replace_jars +elif [[ $(is_aws_emr) == 1 ]] +then + emr_replace_jars +elif [[ $(is_hdi_3_x) == 1 ]] +then + hdi_replace_jars +else + touch "${BYPASS}" +fi + +jar_list="${common_jars} ${hdfs_jars} ${mr_jars} ${yarn_jars} ${other_jars} ${cdh6_jars} ${emr6_jars} ${lzo_jars}" + +echo "Find platform specific jars:${jar_list}, will replace with these jars under ${SPARK_HOME}/jars." + +if [[ $(is_aws_emr_6) == 1 ]]; then + find ${SPARK_HOME}/jars -name "hive-exec-*.jar" -exec rm -f {} \; + hive_jars=$(find ${emr_spark_lib_path}/jars/ -maxdepth 1 -name "hive-exec-*.jar") + cp ${hive_jars} ${SPARK_HOME}/jars + configuration_jars=$(find ${emr_spark_lib_path}/../ -name "commons-configuration-1.10*.jar") + cp ${configuration_jars} ${KYLIN_HOME}/lib +fi + +if [ $(is_cdh_6_x) == 1 ]; then + if [ -d "${KYLIN_HOME}/bin/hadoop3_jars/cdh6" ]; then + find ${SPARK_HOME}/jars -name "hive-exec-*.jar" -exec rm -f {} \; + echo "Copy jars from ${KYLIN_HOME}/bin/hadoop3_jars/cdh6" + cp ${KYLIN_HOME}/hadoop3_jars/cdh6/*.jar ${SPARK_HOME}/jars + fi +fi + +if [ ! -f ${BYPASS} ]; then + find ${SPARK_HOME}/jars -name "htrace-core-*" -exec rm -rf {} \; + find ${SPARK_HOME}/jars -name "hadoop-*.jar" -exec rm -f {} \; +fi + +for jar_file in ${jar_list} +do + `cp ${jar_file} ${SPARK_HOME}/jars` +done + +if [[ (${is_emr} == 1) || ($(is_cdh_6_x) == 1)]]; then + log4j_jars=$(find ${SPARK_HOME}/jars/ -maxdepth 2 -name "slf4j-*.jar") + cp ${log4j_jars} ${KYLIN_HOME}/ext +fi + +if [ $(is_hdi_3_x) == 1 ]; then + if [[ -f ${KYLIN_HOME}/tomcat/webapps/kylin.war ]]; then + if [[ ! -d ${KYLIN_HOME}/tomcat/webapps/kylin ]] + then + mkdir ${KYLIN_HOME}/tomcat/webapps/kylin + fi + mv ${KYLIN_HOME}/tomcat/webapps/kylin.war ${KYLIN_HOME}/tomcat/webapps/kylin + cd ${KYLIN_HOME}/tomcat/webapps/kylin + jar -xf ${KYLIN_HOME}/tomcat/webapps/kylin/kylin.war + if [[ -f ${KYLIN_HOME}/tomcat/webapps/kylin/WEB-INF/lib/guava-14.0.jar ]] + then + echo "Remove ${KYLIN_HOME}/tomcat/webapps/kylin/WEB-INF/lib/guava-14.0.jar to avoid version conflicts" + rm -rf ${KYLIN_HOME}/tomcat/webapps/kylin/WEB-INF/lib/guava-14.0.jar + rm -rf ${KYLIN_HOME}/tomcat/webapps/kylin/kylin.war + cd ${KYLIN_HOME}/ + fi + fi + find ${SPARK_HOME}/jars -name "guava-14*.jar" -exec rm -f {} \; + echo "Upload spark jars to HDFS" + hdfs dfs -test -d /spark2_jars + if [ $? -eq 1 ]; then + hdfs dfs -mkdir /spark2_jars + fi + hdfs dfs -put ${SPARK_HOME}/jars/* /spark2_jars +fi + +# Remove all spaces +jar_list=${jar_list// /} + +if [[ (-z "${jar_list}") && (! -f ${BYPASS}) ]] +then + echo "Please confirm that the corresponding hadoop jars have been replaced. The automatic replacement program cannot be executed correctly." +else + touch "${BYPASS}" +fi + +echo "Done hadoop jars replacement under ${SPARK_HOME}/jars." diff --git a/build/bin/replace-jars-under-spark.sh b/build/bin/replace-jars-under-spark.sh deleted file mode 100644 index 01f9854..0000000 --- a/build/bin/replace-jars-under-spark.sh +++ /dev/null @@ -1,149 +0,0 @@ -#!/bin/bash - -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -# check https://cwiki.apache.org/confluence/display/KYLIN/Deploy+Kylin+4+on+CDH+6 - -BYPASS=${KYLIN_HOME}/spark/jars/replace-jars-bypass -cdh_mapreduce_path="/opt/cloudera/parcels/CDH/lib/hadoop-mapreduce" -hadoop_lib_path="/usr/lib/hadoop" - -if [ -f ${BYPASS} ]; then - exit 0 -fi - -if [ ! -d "$KYLIN_HOME/spark" ]; then - echo "Skip spark which not owned by kylin. SPARK_HOME is $SPARK_HOME and KYLIN_HOME is $KYLIN_HOME ." - exit 0 -fi - -echo "Start replacing hadoop jars under ${SPARK_HOME}/jars." - -function check_cdh_hadoop() { - # hadoop-common-3.0.0-cdh6.2.0.jar - hadoop_common_file=$(find ${cdh_mapreduce_path}/../hadoop/ -maxdepth 1 -name "hadoop-common-*.jar" -not -name "*test*" | tail -1) - cdh_version=${hadoop_common_file##*-} - if [[ "${cdh_version}" == cdh6.* ]]; then - export is_cdh6=1 - else - export is_cdh6=0 - fi - if [[ "${cdh_version}" == cdh5.* ]]; then - export is_cdh5=1 - else - export is_cdh5=0 - fi -} - -function check_aws_emr() { - if [ ! -d $hadoop_lib_path ]; then - return 0 - fi - - # hadoop-common-3.2.1-amzn-0.jar - hadoop_common_file=$(find $hadoop_lib_path -maxdepth 1 -name "hadoop-common-*.jar" -not -name "*test*" | tail -1) - emr_version_1=${hadoop_common_file##*common-} - echo $emr_version_1 - arrVersion=(${emr_version_1//-/ }) - - if [[ "${arrVersion[0]}" == 3.* && "${arrVersion[1]}" == *amzn* ]]; then - export is_emr6=1 - else - export is_emr6=0 - fi - - if [[ "${arrVersion[0]}" == 2.* && "${arrVersion[1]}" == *amzn* ]]; then - export is_emr5=1 - else - export is_emr5=0 - fi -} - -check_cdh_hadoop -check_aws_emr - -common_jars= -hdfs_jars= -mr_jars= -yarn_jars= -other_jars= - -if [ $is_cdh6 == 1 ]; then - common_jars=$(find $cdh_mapreduce_path/../hadoop -maxdepth 2 \ - -name "hadoop-annotations-*.jar" -not -name "*test*" \ - -o -name "hadoop-auth-*.jar" -not -name "*test*" \ - -o -name "hadoop-common-*.jar" -not -name "*test*") - - hdfs_jars=$(find $cdh_mapreduce_path/../hadoop-hdfs -maxdepth 1 -name "hadoop-hdfs-*" -not -name "*test*" -not -name "*nfs*") - - mr_jars=$(find $cdh_mapreduce_path -maxdepth 1 \ - -name "hadoop-mapreduce-client-app-*.jar" -not -name "*test*" \ - -o -name "hadoop-mapreduce-client-common-*.jar" -not -name "*test*" \ - -o -name "hadoop-mapreduce-client-jobclient-*.jar" -not -name "*test*" \ - -o -name "hadoop-mapreduce-client-shuffle-*.jar" -not -name "*test*" \ - -o -name "hadoop-mapreduce-client-core-*.jar" -not -name "*test*") - - yarn_jars=$(find $cdh_mapreduce_path/../hadoop-yarn -maxdepth 1 \ - -name "hadoop-yarn-api-*.jar" -not -name "*test*" \ - -o -name "hadoop-yarn-client-*.jar" -not -name "*test*" \ - -o -name "hadoop-yarn-common-*.jar" -not -name "*test*" \ - -o -name "hadoop-yarn-server-common-*.jar" -not -name "*test*" \ - -o -name "hadoop-yarn-server-web-proxy-*.jar" -not -name "*test*") - - other_jars=$(find $cdh_mapreduce_path/../../jars -maxdepth 1 -name "htrace-core4*" || find $cdh_mapreduce_path/../hadoop -maxdepth 2 -name "htrace-core4*") - - if [[ $is_cdh6 == 1 ]]; then - cdh6_jars=$(find ${cdh_mapreduce_path}/../../jars -maxdepth 1 \ - -name "woodstox-core-*.jar" -o -name "commons-configuration2-*.jar" -o -name "re2j-*.jar") - fi -fi - -jar_list="${common_jars} ${hdfs_jars} ${mr_jars} ${yarn_jars} ${other_jars} ${cdh6_jars}" - -echo "Find platform specific jars:${jar_list}, will replace with these jars under ${SPARK_HOME}/jars." - -if [ $is_cdh6 == 1 ]; then - find ${KYLIN_HOME}/spark/jars -name "hadoop-hdfs-*.jar" -exec rm -f {} \; - find ${KYLIN_HOME}/spark/jars -name "hadoop-yarn-*.jar" -exec rm -f {} \; - find ${KYLIN_HOME}/spark/jars -name "hadoop-mapreduce-*.jar" -exec rm -f {} \; - find ${KYLIN_HOME}/spark/jars -name "hadoop-annotations-*.jar" -exec rm -f {} \; - find ${KYLIN_HOME}/spark/jars -name "hadoop-auth-*.jar" -exec rm -f {} \; - find ${KYLIN_HOME}/spark/jars -name "hadoop-client-*.jar" -exec rm -f {} \; - find ${KYLIN_HOME}/spark/jars -name "hadoop-common-*.jar" -exec rm -f {} \; - find ${KYLIN_HOME}/spark/jars -name "hive-exec-*.jar" -exec rm -f {} \; - if [ -d "${KYLIN_HOME}/bin/hadoop3_jars/cdh6" ]; then - echo "Copy jars from ${KYLIN_HOME}/bin/hadoop3_jars/cdh6" - cp ${KYLIN_HOME}/bin/hadoop3_jars/cdh6/*.jar ${SPARK_HOME}/jars - fi -fi - -for jar_file in ${jar_list}; do - $(cp ${jar_file} ${KYLIN_HOME}/spark/jars) -done - -# Remove all spaces -jar_list=${jar_list// /} - -if [ -z "${jar_list}" ]; then - echo "Please confirm that the corresponding hadoop jars have been replaced. The automatic replacement program cannot be executed correctly." -else - echo "Replace jars under SPARK_HOME/jars finished." - touch ${BYPASS} -fi - -echo "Done hadoop jars replacement under ${SPARK_HOME}/jars." \ No newline at end of file diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index 6246b94..19f84a2 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -1129,7 +1129,7 @@ public abstract class KylinConfigBase implements Serializable { } public String getHiveClientMode() { - return getOptional("kylin.source.hive.client", "cli"); + return getOptional("kylin.source.hive.client", "spark_catalog"); } public String getHiveBeelineShell() { diff --git a/core-common/src/test/java/org/apache/kylin/common/util/HiveCmdBuilderTest.java b/core-common/src/test/java/org/apache/kylin/common/util/HiveCmdBuilderTest.java deleted file mode 100644 index a1b78db..0000000 --- a/core-common/src/test/java/org/apache/kylin/common/util/HiveCmdBuilderTest.java +++ /dev/null @@ -1,118 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.common.util; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -import java.io.File; -import java.io.IOException; -import java.nio.charset.Charset; -import java.util.HashMap; -import java.util.Map; - -import org.apache.commons.io.FileUtils; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -public class HiveCmdBuilderTest { - - @Before - public void setup() { - System.setProperty("log4j.configuration", "file:../build/conf/kylin-tools-log4j.properties"); - System.setProperty("KYLIN_CONF", LocalFileMetadataTestCase.LOCALMETA_TEST_DATA); - } - - @After - public void after() throws Exception { - System.clearProperty("kylin.source.hive.client"); - System.clearProperty("kylin.source.hive.beeline-shell"); - System.clearProperty("kylin.source.hive.beeline-params"); - - System.clearProperty("kylin.source.hive.enable-sparksql-for-table-ops"); - System.clearProperty("kylin.source.hive.sparksql-beeline-shell"); - System.clearProperty("kylin.source.hive.sparksql-beeline-params"); - } - - @Test - public void testHiveCLI() { - System.setProperty("kylin.source.hive.client", "cli"); - - Map<String, String> hiveProps = new HashMap<>(); - hiveProps.put("hive.execution.engine", "mr"); - Map<String, String> hivePropsOverwrite = new HashMap<>(); - hivePropsOverwrite.put("hive.execution.engine", "tez"); - HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder("test HiveCLI"); - hiveCmdBuilder.addStatement("USE default;"); - hiveCmdBuilder.addStatement("DROP TABLE `test`;"); - hiveCmdBuilder.addStatement("SHOW\n TABLES;"); - hiveCmdBuilder.setHiveConfProps(hiveProps); - hiveCmdBuilder.overwriteHiveProps(hivePropsOverwrite); - assertEquals( - "hive -e \"set mapred.job.name='test HiveCLI';\nUSE default;\nDROP TABLE \\`test\\`;\nSHOW\n TABLES;\n\" --hiveconf hive.execution.engine=tez", - hiveCmdBuilder.build()); - } - - @Test - public void testBeeline() throws IOException { - String lineSeparator = java.security.AccessController - .doPrivileged(new sun.security.action.GetPropertyAction("line.separator")); - System.setProperty("kylin.source.hive.client", "beeline"); - System.setProperty("kylin.source.hive.beeline-shell", "/spark-client/bin/beeline"); - System.setProperty("kylin.source.hive.beeline-params", "-u jdbc_url"); - - HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder(); - hiveCmdBuilder.addStatement("USE default;"); - hiveCmdBuilder.addStatement("DROP TABLE `test`;"); - hiveCmdBuilder.addStatement("SHOW TABLES;"); - - String cmd = hiveCmdBuilder.build(); - String hqlFile = cmd.substring(cmd.lastIndexOf("-f ") + 3).trim(); - hqlFile = hqlFile.substring(0, hqlFile.length() - ";exit $ret_code".length()); - String createFileCmd = cmd.substring(0, cmd.indexOf("EOL\n", cmd.indexOf("EOL\n") + 1) + 3); - CliCommandExecutor cliCommandExecutor = new CliCommandExecutor(); - Pair<Integer, String> execute = cliCommandExecutor.execute(createFileCmd); - String hqlStatement = FileUtils.readFileToString(new File(hqlFile), Charset.defaultCharset()); - assertEquals( - "USE default;" + lineSeparator + "DROP TABLE `test`;" + lineSeparator + "SHOW TABLES;" + lineSeparator, - hqlStatement); - assertBeelineCmd(cmd); - FileUtils.forceDelete(new File(hqlFile)); - } - - @Test - public void testSparkSqlForTableOps() throws IOException { - System.setProperty("kylin.source.hive.enable-sparksql-for-table-ops", "true"); - System.setProperty("kylin.source.hive.sparksql-beeline-shell", "/spark-client/bin/beeline"); - System.setProperty("kylin.source.hive.sparksql-beeline-params", "-u jdbc_url"); - - HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder(); - hiveCmdBuilder.addStatement("USE default;"); - hiveCmdBuilder.addStatement("DROP TABLE `test`;"); - hiveCmdBuilder.addStatement("SHOW TABLES;"); - String cmd = hiveCmdBuilder.build(); - assertBeelineCmd(cmd); - } - - private void assertBeelineCmd(String cmd) { - String beelineCmd = cmd.substring(cmd.indexOf("EOL\n", cmd.indexOf("EOL\n") + 1) + 4); - assertTrue(beelineCmd.startsWith("/spark-client/bin/beeline -u jdbc_url")); - } -} diff --git a/kylin-spark-project/kylin-spark-classloader/src/main/java/org/apache/kylin/spark/classloader/ClassLoaderUtils.java b/kylin-spark-project/kylin-spark-classloader/src/main/java/org/apache/kylin/spark/classloader/ClassLoaderUtils.java index d544059..8f5bb9b 100644 --- a/kylin-spark-project/kylin-spark-classloader/src/main/java/org/apache/kylin/spark/classloader/ClassLoaderUtils.java +++ b/kylin-spark-project/kylin-spark-classloader/src/main/java/org/apache/kylin/spark/classloader/ClassLoaderUtils.java @@ -24,7 +24,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; public final class ClassLoaderUtils { - static URLClassLoader sparkClassLoader = null; static URLClassLoader originClassLoader = null; private static Logger logger = LoggerFactory.getLogger(ClassLoaderUtils.class); @@ -40,25 +39,6 @@ public final class ClassLoaderUtils { return null; } - public static ClassLoader getSparkClassLoader() { - if (sparkClassLoader == null) { - return Thread.currentThread().getContextClassLoader(); - } else { - return sparkClassLoader; - } - } - - public static void setSparkClassLoader(URLClassLoader classLoader) { - if (sparkClassLoader != null) { - logger.error("sparkClassLoader already initialized"); - } - logger.info("set sparkClassLoader :" + classLoader); - if (System.getenv("DEBUG_SPARK_CLASSLOADER") != null) { - return; - } - sparkClassLoader = classLoader; - } - public static ClassLoader getOriginClassLoader() { if (originClassLoader == null) { logger.error("originClassLoader not init"); diff --git a/kylin-spark-project/kylin-spark-classloader/src/main/java/org/apache/kylin/spark/classloader/SparkClassLoader.java b/kylin-spark-project/kylin-spark-classloader/src/main/java/org/apache/kylin/spark/classloader/SparkClassLoader.java deleted file mode 100644 index 2dc0292..0000000 --- a/kylin-spark-project/kylin-spark-classloader/src/main/java/org/apache/kylin/spark/classloader/SparkClassLoader.java +++ /dev/null @@ -1,212 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.spark.classloader; - -import org.apache.commons.lang.StringUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.File; -import java.io.IOException; -import java.lang.reflect.Method; -import java.net.MalformedURLException; -import java.net.URL; -import java.net.URLClassLoader; -import java.security.AccessController; -import java.security.PrivilegedAction; -import java.util.HashSet; -import java.util.Set; - -public class SparkClassLoader extends URLClassLoader { - //preempt these classes from parent - private static String[] SPARK_CL_PREEMPT_CLASSES = new String[] {"org.apache.spark", "scala.", - "org.spark_project", "com.esotericsoftware.kryo"}; - - //preempt these files from parent - private static String[] SPARK_CL_PREEMPT_FILES = new String[] {"spark-version-info.properties", "HiveClientImpl", - "org/apache/spark"}; - - //when loading class (indirectly used by SPARK_CL_PREEMPT_CLASSES), some of them should NOT use parent's first - private static String[] THIS_CL_PRECEDENT_CLASSES = new String[] {"javax.ws.rs", "org.apache.hadoop.hive"}; - - //when loading class (indirectly used by SPARK_CL_PREEMPT_CLASSES), some of them should use parent's first - private static String[] PARENT_CL_PRECEDENT_CLASSES = new String[] { - // Java standard library: - "com.sun.", "launcher.", "java.", "javax.", "org.ietf", "org.omg", "org.w3c", "org.xml", "sunw.", "sun.", - // logging - "org.apache.commons.logging", "org.apache.log4j", "org.slf4j", "org.apache.hadoop", - // Hadoop/ZK: - "org.apache.kylin", "com.intellij", "org.apache.calcite"}; - - private static final Set<String> classNotFoundCache = new HashSet<>(); - private static Logger logger = LoggerFactory.getLogger(SparkClassLoader.class); - - static { - String sparkClassLoaderSparkClPreemptClasses = System.getenv("SPARKCLASSLOADER_SPARK_CL_PREEMPT_CLASSES"); - if (!StringUtils.isEmpty(sparkClassLoaderSparkClPreemptClasses)) { - SPARK_CL_PREEMPT_CLASSES = StringUtils.split(sparkClassLoaderSparkClPreemptClasses, ","); - } - - String sparkClassLoaderSparkClPreemptFiles = System.getenv("SPARKCLASSLOADER_SPARK_CL_PREEMPT_FILES"); - if (!StringUtils.isEmpty(sparkClassLoaderSparkClPreemptFiles)) { - SPARK_CL_PREEMPT_FILES = StringUtils.split(sparkClassLoaderSparkClPreemptFiles, ","); - } - - String sparkClassLoaderThisClPrecedentClasses = System.getenv("SPARKCLASSLOADER_THIS_CL_PRECEDENT_CLASSES"); - if (!StringUtils.isEmpty(sparkClassLoaderThisClPrecedentClasses)) { - THIS_CL_PRECEDENT_CLASSES = StringUtils.split(sparkClassLoaderThisClPrecedentClasses, ","); - } - - String sparkClassLoaderParentClPrecedentClasses = System - .getenv("SPARKCLASSLOADER_PARENT_CL_PRECEDENT_CLASSES"); - if (!StringUtils.isEmpty(sparkClassLoaderParentClPrecedentClasses)) { - PARENT_CL_PRECEDENT_CLASSES = StringUtils.split(sparkClassLoaderParentClPrecedentClasses, ","); - } - - try { - final Method registerParallel = ClassLoader.class.getDeclaredMethod("registerAsParallelCapable"); - AccessController.doPrivileged(new PrivilegedAction<Object>() { - public Object run() { - registerParallel.setAccessible(true); - return null; - } - }); - Boolean result = (Boolean) registerParallel.invoke(null); - if (!result) { - logger.warn("registrationFailed"); - } - } catch (Exception ignore) { - - } - } - - /** - * Creates a DynamicClassLoader that can load classes dynamically - * from jar files under a specific folder. - * - * @param parent the parent ClassLoader to set. - */ - protected SparkClassLoader(ClassLoader parent) throws IOException { - super(new URL[] {}, parent); - init(); - } - - public void init() throws MalformedURLException { - String sparkHome = System.getenv("SPARK_HOME"); - if (sparkHome == null) { - sparkHome = System.getProperty("SPARK_HOME"); - if (sparkHome == null) { - throw new RuntimeException( - "Spark home not found; set it explicitly or use the SPARK_HOME environment variable."); - } - } - File file = new File(sparkHome + "/jars"); - File[] jars = file.listFiles(); - for (File jar : jars) { - addURL(jar.toURI().toURL()); - } - } - - @Override - public Class<?> loadClass(String name, boolean resolve) throws ClassNotFoundException { - - if (needToUseGlobal(name)) { - logger.debug("delegate " + name + " directly to parent"); - return super.loadClass(name, resolve); - } - return doLoadclass(name); - } - - private Class<?> doLoadclass(String name) throws ClassNotFoundException { - synchronized (getClassLoadingLock(name)) { - // Check whether the class has already been loaded: - Class<?> clasz = findLoadedClass(name); - if (clasz != null) { - logger.debug("Class " + name + " already loaded"); - } else { - try { - // Try to find this class using the URLs passed to this ClassLoader - logger.debug("Finding class: " + name); - clasz = super.findClass(name); - if (clasz == null) { - logger.debug("cannot find class" + name); - } - } catch (ClassNotFoundException e) { - classNotFoundCache.add(name); - // Class not found using this ClassLoader, so delegate to parent - logger.debug("Class " + name + " not found - delegating to parent"); - try { - // sparder and query module has some class start with org.apache.spark, - // We need to use some lib that does not exist in spark/jars - clasz = getParent().loadClass(name); - } catch (ClassNotFoundException e2) { - // Class not found in this ClassLoader or in the parent ClassLoader - // Log some debug output before re-throwing ClassNotFoundException - logger.debug("Class " + name + " not found in parent loader"); - throw e2; - } - } - } - return clasz; - } - } - - private boolean isThisCLPrecedent(String name) { - for (String exemptPrefix : THIS_CL_PRECEDENT_CLASSES) { - if (name.startsWith(exemptPrefix)) { - return true; - } - } - return false; - } - - private boolean isParentCLPrecedent(String name) { - for (String exemptPrefix : PARENT_CL_PRECEDENT_CLASSES) { - if (name.startsWith(exemptPrefix)) { - return true; - } - } - return false; - } - - private boolean needToUseGlobal(String name) { - return !isThisCLPrecedent(name) && !classNeedPreempt(name) && isParentCLPrecedent(name); - } - - boolean classNeedPreempt(String name) { - if (classNotFoundCache.contains(name)) { - return false; - } - for (String exemptPrefix : SPARK_CL_PREEMPT_CLASSES) { - if (name.startsWith(exemptPrefix)) { - return true; - } - } - return false; - } - - boolean fileNeedPreempt(String name) { - for (String exemptPrefix : SPARK_CL_PREEMPT_FILES) { - if (name.contains(exemptPrefix)) { - return true; - } - } - return false; - } -} diff --git a/kylin-spark-project/kylin-spark-classloader/src/main/java/org/apache/kylin/spark/classloader/TomcatClassLoader.java b/kylin-spark-project/kylin-spark-classloader/src/main/java/org/apache/kylin/spark/classloader/TomcatClassLoader.java index f403dbb..024d628 100644 --- a/kylin-spark-project/kylin-spark-classloader/src/main/java/org/apache/kylin/spark/classloader/TomcatClassLoader.java +++ b/kylin-spark-project/kylin-spark-classloader/src/main/java/org/apache/kylin/spark/classloader/TomcatClassLoader.java @@ -58,7 +58,6 @@ public class TomcatClassLoader extends ParallelWebappClassLoader { } private static Logger logger = LoggerFactory.getLogger(TomcatClassLoader.class); - private SparkClassLoader sparkClassLoader; /** * Creates a DynamicClassLoader that can load classes dynamically @@ -68,8 +67,6 @@ public class TomcatClassLoader extends ParallelWebappClassLoader { */ public TomcatClassLoader(ClassLoader parent) throws IOException { super(parent); - sparkClassLoader = new SparkClassLoader(this); - ClassLoaderUtils.setSparkClassLoader(sparkClassLoader); ClassLoaderUtils.setOriginClassLoader(this); init(); } @@ -104,9 +101,6 @@ public class TomcatClassLoader extends ParallelWebappClassLoader { if (name.startsWith("org.apache.kylin.spark.classloader")) { return parent.loadClass(name); } - if (sparkClassLoader.classNeedPreempt(name)) { - return sparkClassLoader.loadClass(name); - } if (isParentCLPrecedent(name)) { logger.debug("Skipping exempt class " + name + " - delegating directly to parent"); return parent.loadClass(name); @@ -116,9 +110,6 @@ public class TomcatClassLoader extends ParallelWebappClassLoader { @Override public InputStream getResourceAsStream(String name) { - if (sparkClassLoader.fileNeedPreempt(name)) { - return sparkClassLoader.getResourceAsStream(name); - } return super.getResourceAsStream(name); } diff --git a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/SparderContext.scala b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/SparderContext.scala index 47d0c9b..2061257 100644 --- a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/SparderContext.scala +++ b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/SparderContext.scala @@ -54,7 +54,7 @@ object SparderContext extends Logging { @volatile var master_app_url: String = _ - def getOriginalSparkSession: SparkSession = withClassLoad { + def getOriginalSparkSession: SparkSession = { if (spark == null || spark.sparkContext.isStopped) { logInfo("Init spark.") initSpark() @@ -92,7 +92,7 @@ object SparderContext extends Logging { spark != null && !spark.sparkContext.isStopped } - def restartSpark(): Unit = withClassLoad { + def restartSpark(): Unit = { this.synchronized { if (spark != null && !spark.sparkContext.isStopped) { Utils.tryWithSafeFinally { @@ -107,7 +107,7 @@ object SparderContext extends Logging { } } - def stopSpark(): Unit = withClassLoad { + def stopSpark(): Unit = { this.synchronized { if (spark != null && !spark.sparkContext.isStopped) { Utils.tryWithSafeFinally { @@ -119,7 +119,7 @@ object SparderContext extends Logging { } } - def init(): Unit = withClassLoad { + def init(): Unit = { getOriginalSparkSession } @@ -127,7 +127,7 @@ object SparderContext extends Logging { getSparkSession.sparkContext.conf.get(key) } - def initSpark(): Unit = withClassLoad { + def initSpark(): Unit = this.synchronized { if (initializingThread == null && (spark == null || spark.sparkContext.isStopped)) { initializingThread = new Thread(new Runnable { @@ -219,7 +219,6 @@ object SparderContext extends Logging { // init FileStatusCache ShardFileStatusCache.getFileStatusCache(getOriginalSparkSession) } - } def registerListener(sc: SparkContext): Unit = { val sparkListener = new SparkListener { @@ -250,21 +249,6 @@ object SparderContext extends Logging { logicalPlan } - /** - * To avoid spark being affected by the environment, we use spark classloader load spark. - * - * @param body Somewhere if you use spark - * @tparam T Action function - * @return The body return - */ - def withClassLoad[T](body: => T): T = { - // val originClassLoad = Thread.currentThread().getContextClassLoader - Thread.currentThread().setContextClassLoader(ClassLoaderUtils.getSparkClassLoader) - val t = body - // Thread.currentThread().setContextClassLoader(originClassLoad) - t - } - val _isAsyncQuery = new ThreadLocal[JBoolean] val _separator = new ThreadLocal[JString] val _df = new ThreadLocal[Dataset[Row]] diff --git a/source-hive/pom.xml b/source-hive/pom.xml index 8430796..895db29 100644 --- a/source-hive/pom.xml +++ b/source-hive/pom.xml @@ -37,7 +37,11 @@ <groupId>org.apache.kylin</groupId> <artifactId>kylin-build-engine</artifactId> </dependency> - + <dependency> + <groupId>org.apache.kylin</groupId> + <artifactId>kylin-spark-query</artifactId> + <version>${project.version}</version> + </dependency> <!-- Env & Test --> <dependency> <groupId>com.h2database</groupId> @@ -66,6 +70,11 @@ <scope>provided</scope> </dependency> <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-sql_${scala.binary.version}</artifactId> + <scope>provided</scope> + </dependency> + <dependency> <groupId>org.apache.mrunit</groupId> <artifactId>mrunit</artifactId> <classifier>hadoop2</classifier> diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/BeelineHiveClient.java b/source-hive/src/main/java/org/apache/kylin/source/hive/BeelineHiveClient.java deleted file mode 100644 index 669d66e..0000000 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/BeelineHiveClient.java +++ /dev/null @@ -1,300 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.source.hive; - -import java.io.BufferedReader; -import java.io.File; -import java.io.FileInputStream; -import java.io.IOException; -import java.io.InputStreamReader; -import java.nio.charset.StandardCharsets; -import java.sql.Connection; -import java.sql.DatabaseMetaData; -import java.sql.DriverManager; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.sql.Statement; -import java.util.ArrayList; -import java.util.List; -import java.util.Properties; - -import org.apache.commons.lang3.StringUtils; -import org.apache.kylin.common.util.DBUtils; -import org.apache.kylin.common.util.SourceConfigurationUtil; - -import org.apache.kylin.shaded.com.google.common.base.Preconditions; -import org.apache.kylin.shaded.com.google.common.collect.Lists; - -public class BeelineHiveClient implements IHiveClient { - - private static final String HIVE_AUTH_USER = "user"; - private static final String HIVE_AUTH_PASSWD = "password"; - private Connection cnct; - private Statement stmt; - private DatabaseMetaData metaData; - - public BeelineHiveClient(String beelineParams) { - if (StringUtils.isEmpty(beelineParams)) { - throw new IllegalArgumentException("BeelineParames cannot be empty"); - } - String[] splits = StringUtils.split(beelineParams); - String url = "", username = "", password = ""; - for (int i = 0; i < splits.length - 1; i++) { - if ("-u".equals(splits[i])) { - url = stripQuotes(splits[i + 1]); - } - if ("-n".equals(splits[i])) { - username = stripQuotes(splits[i + 1]); - } - if ("-p".equals(splits[i])) { - password = stripQuotes(splits[i + 1]); - } - if ("-w".equals(splits[i])) { - File file = new File(splits[i + 1]); - BufferedReader br = null; - try { - br = new BufferedReader(new InputStreamReader(new FileInputStream(file), StandardCharsets.UTF_8)); - try { - password = br.readLine(); - } finally { - if (null != br) { - br.close(); - } - } - } catch (IOException e) { - throw new RuntimeException(e); - } - } - } - Properties jdbcProperties = SourceConfigurationUtil.loadHiveJDBCProperties(); - jdbcProperties.put(HIVE_AUTH_PASSWD, password); - jdbcProperties.put(HIVE_AUTH_USER, username); - this.init(url, jdbcProperties); - } - - private void init(String url, Properties hiveProperties) { - try { - Class.forName("org.apache.hive.jdbc.HiveDriver"); - cnct = DriverManager.getConnection(url, hiveProperties); - stmt = cnct.createStatement(); - metaData = cnct.getMetaData(); - } catch (SQLException | ClassNotFoundException e) { - throw new RuntimeException(e); - } - } - - private String stripQuotes(String input) { - if (input.startsWith("'") && input.endsWith("'")) { - return StringUtils.strip(input, "'"); - } else if (input.startsWith("\"") && input.endsWith("\"")) { - return StringUtils.strip(input, "\""); - } else { - return input; - } - } - - public List<String> getHiveDbNames() throws Exception { - List<String> ret = Lists.newArrayList(); - ResultSet schemas = metaData.getSchemas(); - while (schemas.next()) { - ret.add(String.valueOf(schemas.getObject(1))); - } - DBUtils.closeQuietly(schemas); - return ret; - } - - public List<String> getHiveTableNames(String database) throws Exception { - List<String> ret = Lists.newArrayList(); - ResultSet tables = metaData.getTables(null, database, null, null); - while (tables.next()) { - ret.add(String.valueOf(tables.getObject(3))); - } - DBUtils.closeQuietly(tables); - return ret; - } - - @Override - public long getHiveTableRows(String database, String tableName) throws Exception { - ResultSet resultSet = null; - long count = 0; - try { - String query = "select count(*) from "; - resultSet = stmt.executeQuery(query.concat(database + "." + tableName)); - if (resultSet.next()) { - count = resultSet.getLong(1); - } - } finally { - DBUtils.closeQuietly(resultSet); - } - return count; - } - - @Override - public List<Object[]> getHiveResult(String hql) throws Exception { - ResultSet resultSet = null; - List<Object[]> datas = new ArrayList<>(); - try { - resultSet = stmt.executeQuery(hql); - int columnCtn = resultSet.getMetaData().getColumnCount(); - while (resultSet.next()) { - Object[] data = new Object[columnCtn]; - for (int i = 0; i < columnCtn; i++) { - data[i] = resultSet.getObject(i + 1); - } - datas.add(data); - } - } finally { - DBUtils.closeQuietly(resultSet); - } - return datas; - } - - @Override - public void executeHQL(String hql) throws IOException { - throw new UnsupportedOperationException(); - } - - @Override - public void executeHQL(String[] hqls) throws IOException { - throw new UnsupportedOperationException(); - } - - public HiveTableMeta getHiveTableMeta(String database, String tableName) throws SQLException { - ResultSet columns = metaData.getColumns(null, database, tableName, null); - HiveTableMetaBuilder builder = new HiveTableMetaBuilder(); - builder.setTableName(tableName); - - List<HiveTableMeta.HiveTableColumnMeta> allColumns = Lists.newArrayList(); - while (columns.next()) { - String columnName = columns.getString(4); - String dataType = columns.getString(6); - String comment = columns.getString(12); - dataType = considerDataTypePrecision(dataType, columns.getString(7), columns.getString(9)); - allColumns.add(new HiveTableMeta.HiveTableColumnMeta(columnName, dataType, comment)); - } - builder.setAllColumns(allColumns); - DBUtils.closeQuietly(columns); - String exe = "use "; - stmt.execute(exe.concat(database)); - String des = "describe formatted "; - ResultSet resultSet = stmt.executeQuery(des.concat(tableName)); - extractHiveTableMeta(resultSet, builder); - DBUtils.closeQuietly(resultSet); - return builder.createHiveTableMeta(); - } - - public static String considerDataTypePrecision(String dataType, String precision, String scale) { - if ("VARCHAR".equalsIgnoreCase(dataType) || "CHAR".equalsIgnoreCase(dataType)) { - if (null != precision) - dataType = new StringBuilder(dataType).append("(").append(precision).append(")").toString(); - } - if ("DECIMAL".equalsIgnoreCase(dataType) || "NUMERIC".equalsIgnoreCase(dataType)) { - if (precision != null && scale != null) - dataType = new StringBuilder(dataType).append("(").append(precision).append(",").append(scale) - .append(")").toString(); - } - return dataType; - } - - private void extractHiveTableMeta(ResultSet resultSet, HiveTableMetaBuilder builder) throws SQLException { - while (resultSet.next()) { - parseResultEntry(resultSet, builder); - } - } - - private void parseResultEntry(ResultSet resultSet, HiveTableMetaBuilder builder) throws SQLException { - List<HiveTableMeta.HiveTableColumnMeta> partitionColumns = Lists.newArrayList(); - if ("# Partition Information".equals(resultSet.getString(1).trim())) { - resultSet.next(); - Preconditions.checkArgument("# col_name".equals(resultSet.getString(1).trim())); - resultSet.next(); - Preconditions.checkArgument("".equals(resultSet.getString(1).trim())); - while (resultSet.next()) { - if ("".equals(resultSet.getString(1).trim())) { - break; - } - partitionColumns.add(new HiveTableMeta.HiveTableColumnMeta(resultSet.getString(1).trim(), - resultSet.getString(2).trim(), resultSet.getString(3).trim())); - } - builder.setPartitionColumns(partitionColumns); - } - - if ("Owner:".equals(resultSet.getString(1).trim())) { - builder.setOwner(resultSet.getString(2).trim()); - } - if ("LastAccessTime:".equals(resultSet.getString(1).trim())) { - try { - int i = Integer.parseInt(resultSet.getString(2).trim()); - builder.setLastAccessTime(i); - } catch (NumberFormatException e) { - builder.setLastAccessTime(0); - } - } - if ("Location:".equals(resultSet.getString(1).trim())) { - builder.setSdLocation(resultSet.getString(2).trim()); - } - if ("Table Type:".equals(resultSet.getString(1).trim())) { - builder.setTableType(resultSet.getString(2).trim()); - } - if ("Table Parameters:".equals(resultSet.getString(1).trim())) { - extractTableParam(resultSet, builder); - } - if ("InputFormat:".equals(resultSet.getString(1).trim())) { - builder.setSdInputFormat(resultSet.getString(2).trim()); - } - if ("OutputFormat:".equals(resultSet.getString(1).trim())) { - builder.setSdOutputFormat(resultSet.getString(2).trim()); - } - } - - private void extractTableParam(ResultSet resultSet, HiveTableMetaBuilder builder) throws SQLException { - while (resultSet.next()) { - if (resultSet.getString(2) == null) { - break; - } - if ("storage_handler".equals(resultSet.getString(2).trim())) { - builder.setIsNative(false);//default is true - } - if ("totalSize".equals(resultSet.getString(2).trim())) { - builder.setFileSize(Long.parseLong(resultSet.getString(3).trim()));//default is false - } - if ("numFiles".equals(resultSet.getString(2).trim())) { - builder.setFileNum(Long.parseLong(resultSet.getString(3).trim())); - } - if ("skip.header.line.count".equals(resultSet.getString(2).trim())) { - builder.setSkipHeaderLineCount(resultSet.getString(3).trim()); - } - } - } - - public void close() { - DBUtils.closeQuietly(stmt); - DBUtils.closeQuietly(cnct); - } - - public static void main(String[] args) throws SQLException { - - BeelineHiveClient loader = new BeelineHiveClient( - "-n root --hiveconf hive.security.authorization.sqlstd.confwhitelist.append='mapreduce.job.*|dfs.*' -u 'jdbc:hive2://sandbox:10000'"); - //BeelineHiveClient loader = new BeelineHiveClient(StringUtils.join(args, " ")); - HiveTableMeta hiveTableMeta = loader.getHiveTableMeta("default", "test_kylin_fact_part"); - System.out.println(hiveTableMeta); - loader.close(); - } -} diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/CLIHiveClient.java b/source-hive/src/main/java/org/apache/kylin/source/hive/CLIHiveClient.java deleted file mode 100644 index 361a3c6..0000000 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/CLIHiveClient.java +++ /dev/null @@ -1,191 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.source.hive; - -import org.apache.kylin.shaded.com.google.common.collect.Lists; -import org.apache.hadoop.hive.common.StatsSetupConst; -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.metastore.IMetaStoreClient; -import org.apache.hadoop.hive.metastore.MetaStoreUtils; -import org.apache.hadoop.hive.metastore.api.FieldSchema; -import org.apache.hadoop.hive.metastore.api.Table; -import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.util.HiveCmdBuilder; -import org.apache.kylin.common.util.Pair; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; - -/** - * Hive meta API client for Kylin - * @author shaoshi - * - */ -public class CLIHiveClient implements IHiveClient { - protected HiveConf hiveConf = null; - protected IMetaStoreClient metaStoreClient = null; - - public CLIHiveClient() { - hiveConf = new HiveConf(CLIHiveClient.class); - } - - /** - * only used by Deploy Util - * @throws IOException - */ - @Override - public void executeHQL(String hql) throws IOException { - final HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder(); - hiveCmdBuilder.addStatement(hql); - Pair<Integer, String> response = KylinConfig.getInstanceFromEnv().getCliCommandExecutor() - .execute(hiveCmdBuilder.toString()); - if (response.getFirst() != 0) { - throw new IllegalArgumentException("Failed to execute hql [" + hql + "], error message is: " + response.getSecond()); - } - - } - - /** - * only used by Deploy Util - */ - @Override - public void executeHQL(String[] hqls) throws IOException { - for (String sql : hqls) - executeHQL(sql); - } - - @Override - public HiveTableMeta getHiveTableMeta(String database, String tableName) throws Exception { - HiveTableMetaBuilder builder = new HiveTableMetaBuilder(); - Table table = getMetaStoreClient().getTable(database, tableName); - - List<FieldSchema> allFields = getMetaStoreClient().getFields(database, tableName); - List<FieldSchema> partitionFields = table.getPartitionKeys(); - if (allFields == null) { - allFields = Lists.newArrayList(); - } - if (partitionFields != null && partitionFields.size() > 0) { - allFields.addAll(partitionFields); - } - List<HiveTableMeta.HiveTableColumnMeta> allColumns = Lists.newArrayList(); - List<HiveTableMeta.HiveTableColumnMeta> partitionColumns = Lists.newArrayList(); - for (FieldSchema fieldSchema : allFields) { - allColumns.add(new HiveTableMeta.HiveTableColumnMeta(fieldSchema.getName(), fieldSchema.getType(), fieldSchema.getComment())); - } - if (partitionFields != null && partitionFields.size() > 0) { - for (FieldSchema fieldSchema : partitionFields) { - partitionColumns.add(new HiveTableMeta.HiveTableColumnMeta(fieldSchema.getName(), fieldSchema.getType(), fieldSchema.getComment())); - } - } - builder.setAllColumns(allColumns); - builder.setPartitionColumns(partitionColumns); - - builder.setSdLocation(table.getSd().getLocation()); - builder.setFileSize(getBasicStatForTable(new org.apache.hadoop.hive.ql.metadata.Table(table), StatsSetupConst.TOTAL_SIZE)); - builder.setFileNum(getBasicStatForTable(new org.apache.hadoop.hive.ql.metadata.Table(table), StatsSetupConst.NUM_FILES)); - builder.setIsNative(!MetaStoreUtils.isNonNativeTable(table)); - builder.setTableName(tableName); - builder.setSdInputFormat(table.getSd().getInputFormat()); - builder.setSdOutputFormat(table.getSd().getOutputFormat()); - builder.setOwner(table.getOwner()); - builder.setLastAccessTime(table.getLastAccessTime()); - builder.setTableType(table.getTableType()); - builder.setSkipHeaderLineCount(table.getParameters().get("skip.header.line.count")); - - return builder.createHiveTableMeta(); - } - - @Override - public List<String> getHiveDbNames() throws Exception { - return getMetaStoreClient().getAllDatabases(); - } - - @Override - public List<String> getHiveTableNames(String database) throws Exception { - return getMetaStoreClient().getAllTables(database); - } - - @Override - public long getHiveTableRows(String database, String tableName) throws Exception { - Table table = getMetaStoreClient().getTable(database, tableName); - return getBasicStatForTable(new org.apache.hadoop.hive.ql.metadata.Table(table), StatsSetupConst.ROW_COUNT); - } - - @Override - public List<Object[]> getHiveResult(String hql) throws Exception { - List<Object[]> data = new ArrayList<>(); - - final HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder(); - hiveCmdBuilder.addStatement(hql); - Pair<Integer, String> response = KylinConfig.getInstanceFromEnv().getCliCommandExecutor().execute(hiveCmdBuilder.toString()); - - String[] respData = response.getSecond().split("\n"); - - boolean isData = false; - - for (String item : respData) { - if (item.trim().equalsIgnoreCase("OK")) { - isData = true; - continue; - } - if (item.trim().startsWith("Time taken")) { - isData = false; - } - if (isData) { - Object[] arr = item.split("\t"); - data.add(arr); - } - - } - - return data; - } - - private IMetaStoreClient getMetaStoreClient() throws Exception { - if (metaStoreClient == null) { - metaStoreClient = HiveMetaStoreClientFactory.getHiveMetaStoreClient(hiveConf); - } - return metaStoreClient; - } - - /** - * COPIED FROM org.apache.hadoop.hive.ql.stats.StatsUtil for backward compatibility - * <p> - * Get basic stats of table - * - * @param table - table - * @param statType - type of stats - * @return value of stats - */ - private long getBasicStatForTable(org.apache.hadoop.hive.ql.metadata.Table table, String statType) { - Map<String, String> params = table.getParameters(); - long result = 0; - - if (params != null) { - try { - result = Long.parseLong(params.get(statType)); - } catch (NumberFormatException e) { - result = 0; - } - } - return result; - } -} diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveClientFactory.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveClientFactory.java index 4687973..74e1e8a 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveClientFactory.java +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveClientFactory.java @@ -23,10 +23,8 @@ import org.apache.kylin.common.KylinConfig; public class HiveClientFactory { public static IHiveClient getHiveClient() { - if ("cli".equals(KylinConfig.getInstanceFromEnv().getHiveClientMode())) { - return new CLIHiveClient(); - } else if ("beeline".equals(KylinConfig.getInstanceFromEnv().getHiveClientMode())) { - return new BeelineHiveClient(KylinConfig.getInstanceFromEnv().getHiveBeelineParams()); + if ("spark_catalog".equals(KylinConfig.getInstanceFromEnv().getHiveClientMode())) { + return new SparkHiveClient(); } else { throw new RuntimeException("cannot recognize hive client mode"); } diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveTableMeta.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveTableMeta.java index 9a26c14..4327b90 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveTableMeta.java +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveTableMeta.java @@ -45,14 +45,14 @@ public class HiveTableMeta { String owner; String tableType; int skipHeaderLineCount; - int lastAccessTime; + long lastAccessTime; long fileSize; long fileNum; boolean isNative; List<HiveTableColumnMeta> allColumns; List<HiveTableColumnMeta> partitionColumns; - public HiveTableMeta(String tableName, String sdLocation, String sdInputFormat, String sdOutputFormat, String owner, String tableType, int lastAccessTime, long fileSize, long fileNum, int skipHeaderLineCount, boolean isNative, List<HiveTableColumnMeta> allColumns, List<HiveTableColumnMeta> partitionColumns) { + public HiveTableMeta(String tableName, String sdLocation, String sdInputFormat, String sdOutputFormat, String owner, String tableType, long lastAccessTime, long fileSize, long fileNum, int skipHeaderLineCount, boolean isNative, List<HiveTableColumnMeta> allColumns, List<HiveTableColumnMeta> partitionColumns) { this.tableName = tableName; this.sdLocation = sdLocation; this.sdInputFormat = sdInputFormat; diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveTableMetaBuilder.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveTableMetaBuilder.java index 0f34224..5238fd3 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveTableMetaBuilder.java +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveTableMetaBuilder.java @@ -29,7 +29,7 @@ public class HiveTableMetaBuilder { private String sdOutputFormat; private String owner; private String tableType; - private int lastAccessTime; + private long lastAccessTime; private long fileSize; private long fileNum; private int skipHeaderLineCount; @@ -67,7 +67,7 @@ public class HiveTableMetaBuilder { return this; } - public HiveTableMetaBuilder setLastAccessTime(int lastAccessTime) { + public HiveTableMetaBuilder setLastAccessTime(long lastAccessTime) { this.lastAccessTime = lastAccessTime; return this; } diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/SparkHiveClient.java b/source-hive/src/main/java/org/apache/kylin/source/hive/SparkHiveClient.java new file mode 100644 index 0000000..26b97d7 --- /dev/null +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/SparkHiveClient.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.kylin.source.hive; + +import org.apache.kylin.shaded.com.google.common.collect.Lists; +import org.apache.spark.sql.SparderContext; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.catalyst.TableIdentifier; +import org.apache.spark.sql.catalyst.catalog.CatalogTable; +import org.apache.spark.sql.catalyst.catalog.CatalogTableType; +import org.apache.spark.sql.catalyst.catalog.SessionCatalog; +import org.apache.spark.sql.types.Metadata; +import org.apache.spark.sql.types.StructField; +import scala.Option; +import scala.collection.Iterator; + +import java.io.IOException; +import java.util.List; +import java.util.stream.Collectors; + +public class SparkHiveClient implements IHiveClient { + //key in hive metadata map + private static final String CHAR_VARCHAR_TYPE_STRING = "__CHAR_VARCHAR_TYPE_STRING"; + private static final String HIVE_COMMENT = "comment"; + private static final String HIVE_TABLE_ROWS = "numRows"; + private static final String TABLE_TOTAL_SIZE = "totalSize"; + private static final String TABLE_FILE_NUM = "numFiles"; + + protected SparkSession ss; + protected SessionCatalog catalog; + + public SparkHiveClient() { + ss = SparderContext.getOriginalSparkSession(); + catalog = ss.sessionState().catalog(); + } + + + @Override + public void executeHQL(String hql) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public void executeHQL(String[] hqls) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public HiveTableMeta getHiveTableMeta(String database, String tableName) throws Exception { + HiveTableMetaBuilder builder = new HiveTableMetaBuilder(); + CatalogTable catalogTable = catalog + .getTempViewOrPermanentTableMetadata(new TableIdentifier(tableName, Option.apply(database))); + scala.collection.immutable.List<StructField> structFieldList = catalogTable.schema().toList(); + Iterator<StructField> structFieldIterator = structFieldList.iterator(); + + List<HiveTableMeta.HiveTableColumnMeta> allColumns = Lists.newArrayList(); + List<HiveTableMeta.HiveTableColumnMeta> partitionColumns = Lists.newArrayList(); + while (structFieldIterator.hasNext()) { + StructField structField = structFieldIterator.next(); + String name = structField.name(); + String hiveDataType = structField.dataType().simpleString(); + Metadata metadata = structField.metadata(); + String description = metadata.contains(HIVE_COMMENT) ? metadata.getString(HIVE_COMMENT) : ""; + String datatype = metadata.contains(CHAR_VARCHAR_TYPE_STRING) ? metadata.getString(CHAR_VARCHAR_TYPE_STRING) : hiveDataType; + + allColumns.add(new HiveTableMeta.HiveTableColumnMeta(name, datatype, description)); + if (catalogTable.partitionColumnNames().contains(name)) { + partitionColumns.add(new HiveTableMeta.HiveTableColumnMeta(name, datatype, description)); + } + } + + builder.setAllColumns(allColumns); + builder.setPartitionColumns(partitionColumns); + builder.setSdLocation(catalogTable.location().getPath()); + builder.setFileSize(Long.parseLong(catalogTable.ignoredProperties().apply(TABLE_TOTAL_SIZE))); + builder.setFileNum(Long.parseLong(catalogTable.ignoredProperties().apply(TABLE_FILE_NUM))); + builder.setIsNative(catalogTable.tableType().equals(CatalogTableType.MANAGED())); + builder.setTableName(tableName); + builder.setSdInputFormat(catalogTable.storage().inputFormat().toString()); + builder.setSdOutputFormat(catalogTable.storage().outputFormat().toString()); + builder.setOwner(catalogTable.owner()); + builder.setLastAccessTime(catalogTable.lastAccessTime()); + builder.setTableType(catalogTable.tableType().name()); + + return builder.createHiveTableMeta(); + } + + @Override + public List<String> getHiveDbNames() throws Exception { + return scala.collection.JavaConversions.seqAsJavaList(catalog.listDatabases()); + } + + @Override + public List<String> getHiveTableNames(String database) throws Exception { + List<TableIdentifier> tableIdentifiers = scala.collection.JavaConversions.seqAsJavaList(catalog.listTables(database)); + List<String> tableNames = tableIdentifiers.stream().map(table -> table.table()).collect(Collectors.toList()); + return tableNames; + } + + @Override + public long getHiveTableRows(String database, String tableName) throws Exception { + return Long.parseLong(catalog.getTempViewOrPermanentTableMetadata(new TableIdentifier(tableName, Option.apply(database))) + .ignoredProperties().apply(HIVE_TABLE_ROWS)); + } + + /* + This method was originally used for pushdown query. + The method of pushdown query in kylin4 is PushDownRunnerSparkImpl.executeQuery, so getHiveResult is not implemented here. + */ + @Override + public List<Object[]> getHiveResult(String sql) throws Exception { + return null; + } +} diff --git a/source-hive/src/test/java/org/apache/kylin/source/hive/BeelineHIveClientTest.java b/source-hive/src/test/java/org/apache/kylin/source/hive/BeelineHIveClientTest.java deleted file mode 100644 index 00a25e5..0000000 --- a/source-hive/src/test/java/org/apache/kylin/source/hive/BeelineHIveClientTest.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.source.hive; - -import org.junit.Assert; -import org.junit.Test; - -public class BeelineHIveClientTest { - @Test - public void testBasics() { - String dataType = "varchar"; - String precision = "60"; - String scale = null; - dataType = BeelineHiveClient.considerDataTypePrecision(dataType, precision, scale); - Assert.assertEquals("varchar(60)", dataType); - - dataType = "char"; - precision = "50"; - scale = null; - dataType = BeelineHiveClient.considerDataTypePrecision(dataType, precision, scale); - Assert.assertEquals("char(50)", dataType); - - dataType = "decimal"; - precision = "8"; - scale = "4"; - dataType = BeelineHiveClient.considerDataTypePrecision(dataType, precision, scale); - Assert.assertEquals("decimal(8,4)", dataType); - - dataType = "numeric"; - precision = "7"; - scale = "3"; - dataType = BeelineHiveClient.considerDataTypePrecision(dataType, precision, scale); - Assert.assertEquals("numeric(7,3)", dataType); - } -}