This is an automated email from the ASF dual-hosted git repository. yaqian pushed a commit to branch kylin4_on_cloud in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/kylin4_on_cloud by this push: new 596b8cb Fix 0328 (#1837) 596b8cb is described below commit 596b8cb36f37167c1669e6f3d19db636c89eea27 Author: Tengting Xu <34978943+muk...@users.noreply.github.com> AuthorDate: Wed Mar 30 09:48:56 2022 +0800 Fix 0328 (#1837) * # support different mode of kylin & auto enable glue by the mode of kylin * # add log for fetching nodes messages * minor fix * # minor fix * # minor fix --- .gitignore | 3 +- backup/properties/default/kylin.properties | 1 + .../all}/kylin.properties | 1 + .../job}/kylin.properties | 18 +++-- .../query}/kylin.properties | 8 +- .../properties/templates/kylin.properties.template | 1 + backup/scripts/prepare-ec2-env-for-kylin4.sh | 40 ++++++++++ clouds/aws.py | 12 ++- constant/deployment.py | 5 ++ constant/path.py | 2 + constant/yaml_params.py | 3 + deploy.py | 47 ++++++++++-- engine.py | 7 +- engine_utils.py | 13 +++- instances/aws_instance.py | 87 ++++++++++++++++++++-- kylin_configs.yaml | 4 +- utils.py | 52 ++++++++++--- 17 files changed, 256 insertions(+), 48 deletions(-) diff --git a/.gitignore b/.gitignore index a087ce7..51a5140 100644 --- a/.gitignore +++ b/.gitignore @@ -8,4 +8,5 @@ __pycache__/ .idea logs/ backup/jars -backup/tars \ No newline at end of file +backup/tars +backup/demos diff --git a/backup/properties/default/kylin.properties b/backup/properties/default/kylin.properties index aecaa8f..3cc83dd 100644 --- a/backup/properties/default/kylin.properties +++ b/backup/properties/default/kylin.properties @@ -44,6 +44,7 @@ kylin.engine.spark-conf.spark.hadoop.fs.s3.impl=org.apache.hadoop.fs.s3a.S3AFile ## Query Engine Resource kylin.query.spark-conf.spark.master=spark://{{ SPARK_MASTER }}:7077 +kylin.query.spark-conf.spark.cores.max=4 kylin.query.spark-conf.spark.driver.cores=1 kylin.query.spark-conf.spark.driver.memory=8GB kylin.query.spark-conf.spark.driver.memoryOverhead=1G diff --git a/backup/properties/default/kylin.properties b/backup/properties/mode_templates/all/kylin.properties similarity index 97% copy from backup/properties/default/kylin.properties copy to backup/properties/mode_templates/all/kylin.properties index aecaa8f..3cc83dd 100644 --- a/backup/properties/default/kylin.properties +++ b/backup/properties/mode_templates/all/kylin.properties @@ -44,6 +44,7 @@ kylin.engine.spark-conf.spark.hadoop.fs.s3.impl=org.apache.hadoop.fs.s3a.S3AFile ## Query Engine Resource kylin.query.spark-conf.spark.master=spark://{{ SPARK_MASTER }}:7077 +kylin.query.spark-conf.spark.cores.max=4 kylin.query.spark-conf.spark.driver.cores=1 kylin.query.spark-conf.spark.driver.memory=8GB kylin.query.spark-conf.spark.driver.memoryOverhead=1G diff --git a/backup/properties/default/kylin.properties b/backup/properties/mode_templates/job/kylin.properties similarity index 86% copy from backup/properties/default/kylin.properties copy to backup/properties/mode_templates/job/kylin.properties index aecaa8f..3bbe87b 100644 --- a/backup/properties/default/kylin.properties +++ b/backup/properties/mode_templates/job/kylin.properties @@ -17,7 +17,7 @@ # Kylin server mode, valid value [all, query, job] -kylin.server.mode=all +kylin.server.mode=job kylin.metadata.url=kylin_metadata@jdbc,url=jdbc:mysql://{{ DB_HOST }}:{{ DB_PORT }}/kylin,username=root,password={{ DB_PASSWORD }},maxActive=10,maxIdle=10 kylin.env.zookeeper-connect-string={{ ZOOKEEPER_HOST }} kylin.env.hdfs-working-dir=s3a:/{{ S3_BUCKET_PATH }}/working_dir/ @@ -27,8 +27,10 @@ kylin.engine.spark-conf.spark.history.fs.logDirectory=s3a:/{{ S3_BUCKET_PATH }}/ kylin.engine.spark-conf.spark.master=spark://{{ SPARK_MASTER }}:7077 kylin.cube.cubeplanner.enabled=false + +## Job Engine parameters kylin.engine.spark-conf.spark.executor.cores=2 -kylin.engine.spark-conf.spark.executor.instances=4 +kylin.engine.spark-conf.spark.executor.instances=5 kylin.engine.spark-conf.spark.executor.memory=7GB kylin.engine.spark-conf.spark.executor.memoryOverhead=1GB @@ -45,12 +47,12 @@ kylin.engine.spark-conf.spark.hadoop.fs.s3.impl=org.apache.hadoop.fs.s3a.S3AFile ## Query Engine Resource kylin.query.spark-conf.spark.master=spark://{{ SPARK_MASTER }}:7077 kylin.query.spark-conf.spark.driver.cores=1 -kylin.query.spark-conf.spark.driver.memory=8GB -kylin.query.spark-conf.spark.driver.memoryOverhead=1G -kylin.query.spark-conf.spark.executor.instances=2 -kylin.query.spark-conf.spark.executor.cores=2 -kylin.query.spark-conf.spark.executor.memory=7G -kylin.query.spark-conf.spark.executor.memoryOverhead=1G +kylin.query.spark-conf.spark.driver.memory=512M +kylin.query.spark-conf.spark.driver.memoryOverhead=256M +kylin.query.spark-conf.spark.cores.max=1 +kylin.query.spark-conf.spark.executor.memory=512M +kylin.query.spark-conf.spark.executor.memoryOverhead=256M + kylin.query.spark-conf.spark.sql.parquet.filterPushdown=false kylin.query.spark-conf.spark.hadoop.parquet.filter.columnindex.enabled=true diff --git a/backup/properties/default/kylin.properties b/backup/properties/mode_templates/query/kylin.properties similarity index 95% copy from backup/properties/default/kylin.properties copy to backup/properties/mode_templates/query/kylin.properties index aecaa8f..d1cb3e0 100644 --- a/backup/properties/default/kylin.properties +++ b/backup/properties/mode_templates/query/kylin.properties @@ -17,7 +17,7 @@ # Kylin server mode, valid value [all, query, job] -kylin.server.mode=all +kylin.server.mode=query kylin.metadata.url=kylin_metadata@jdbc,url=jdbc:mysql://{{ DB_HOST }}:{{ DB_PORT }}/kylin,username=root,password={{ DB_PASSWORD }},maxActive=10,maxIdle=10 kylin.env.zookeeper-connect-string={{ ZOOKEEPER_HOST }} kylin.env.hdfs-working-dir=s3a:/{{ S3_BUCKET_PATH }}/working_dir/ @@ -58,11 +58,11 @@ kylin.query.spark-conf.spark.hadoop.parquet.filter.columnindex.enabled=true kylin.query.spark-conf.spark.ui.prometheus.enabled=true kylin.query.spark-conf.spark.executor.processTreeMetrics.enabled=true -## Disable canary -kylin.canary.sparder-context-canary-enabled=false +## Enable canary +kylin.canary.sparder-context-canary-enabled=true ## Query Cache kylin.query.cache-enabled=true ### Prepare for cluster mode kylin.job.scheduler.default=100 -kylin.server.self-discovery-enabled=true \ No newline at end of file +kylin.server.self-discovery-enabled=true diff --git a/backup/properties/templates/kylin.properties.template b/backup/properties/templates/kylin.properties.template index aecaa8f..3cc83dd 100644 --- a/backup/properties/templates/kylin.properties.template +++ b/backup/properties/templates/kylin.properties.template @@ -44,6 +44,7 @@ kylin.engine.spark-conf.spark.hadoop.fs.s3.impl=org.apache.hadoop.fs.s3a.S3AFile ## Query Engine Resource kylin.query.spark-conf.spark.master=spark://{{ SPARK_MASTER }}:7077 +kylin.query.spark-conf.spark.cores.max=4 kylin.query.spark-conf.spark.driver.cores=1 kylin.query.spark-conf.spark.driver.memory=8GB kylin.query.spark-conf.spark.driver.memoryOverhead=1G diff --git a/backup/scripts/prepare-ec2-env-for-kylin4.sh b/backup/scripts/prepare-ec2-env-for-kylin4.sh index bfdfdfa..6428d84 100644 --- a/backup/scripts/prepare-ec2-env-for-kylin4.sh +++ b/backup/scripts/prepare-ec2-env-for-kylin4.sh @@ -194,6 +194,10 @@ HIVE_PACKAGE=apache-hive-${HIVE_VERSION}-bin.tar.gz NODE_EXPORTER_PACKAGE=node_exporter-1.3.1.linux-amd64.tar.gz MDX_PACKAGE=mdx-kylin-${MDX_VERSION}.tar.gz +# Glue needed files +GLUE_META=meta_backups.tgz +GLUE_SQL=create_kylin_demo_table.sql + ### Parameter for JDK 1.8 JDK_PACKAGE=jdk-8u301-linux-x64.tar.gz JDK_DECOMPRESS_NAME=jdk1.8.0_301 @@ -206,6 +210,11 @@ function init_env() { mkdir ${HADOOP_DIR} fi + DEMO_DIR=${HOME_DIR}/kylin_demo + if [[ ! -d $DEMO_DIR ]]; then + mkdir ${DEMO_DIR} + fi + JAVA_HOME=/usr/local/java JRE_HOME=${JAVA_HOME}/jre HADOOP_HOME=${HADOOP_DIR}/hadoop-${HADOOP_VERSION} @@ -585,6 +594,19 @@ master.sink.prometheusServlet.path=/metrics/master/prometheus applications.sink.prometheusServlet.path=/metrics/applications/prometheus EOF + + # Support to customize spark-defaults.conf + cat <<EOF > ${SPARK_HOME}/conf/spark-defaults.conf +spark.hadoop.fs.s3.impl org.apache.hadoop.fs.s3a.S3AFileSystem +spark.hadoop.fs.s3a.endpoint s3.${CURRENT_REGION}.amazonaws.com.cn +spark.master spark://${spark_master_node_private_ip}:7077 +spark.driver.cores 1 +spark.driver.memory 8G +spark.driver.memoryOverhead 1G +spark.executor.memory 7G +spark.executor.memoryOverhead 1G +EOF + logging info "Spark inited ..." touch ${HOME_DIR}/.inited_spark logging info "Spark is ready ..." @@ -891,6 +913,23 @@ function start_node_exporter() { nohup ${NODE_EXPORTER_HOME}/node_exporter >>${NODE_EXPORTER_HOME}/node.log 2>&1 & } +function download_glue_meta() { + if [[ ! -f ${DEMO_DIR}/$GLUE_META ]]; then + logging info "Glue Meta package ${GLUE_META} not downloaded, downloading it ..." + aws s3 cp ${PATH_TO_BUCKET}/kylin_demo/${GLUE_META} ${DEMO_DIR} --region ${CURRENT_REGION} + fi + + if [[ ! -d ${HOME_DIR}/kylin_demo/${GLUE_META%*.tgz} ]]; then + logging info "Decompressing package ${GLUE_META} ..." + tar -zxf ${DEMO_DIR}/${GLUE_META} + fi + + if [[ ! -f ${HOME_DIR}/$ ]]; then + logging info "Glue Sql file ${GLUE_META} not downloaded, downloading it ..." + aws s3 cp ${PATH_TO_BUCKET}/kylin_demo/$GLUE_SQL ${DEMO_DIR} --region ${CURRENT_REGION} + fi +} + function prepare_packages() { if [[ -f ${HOME_DIR}/.prepared_packages ]]; then logging warn "Packages already prepared, skip prepare ..." @@ -931,6 +970,7 @@ function start_services_on_kylin() { sample_for_kylin start_kylin after_start_kylin + download_glue_meta touch ${HOME_DIR}/.first_run fi restart_kylin diff --git a/clouds/aws.py b/clouds/aws.py index c71ce95..8b944ea 100644 --- a/clouds/aws.py +++ b/clouds/aws.py @@ -23,7 +23,7 @@ from itertools import repeat from typing import Optional, Dict, List from constant.config import Config -from constant.deployment import ScaleType, NodeType +from constant.deployment import ScaleType, NodeType, MODE from constant.kylin_properties_params import KylinProperties from constant.path import KYLIN_PROPERTIES_TEMPLATE_DIR from constant.yaml_params import Params @@ -40,6 +40,10 @@ class AWS: self.config = config @property + def kylin_mode(self): + return self.config[MODE.KYLIN.value] + + @property def is_cluster_ready(self) -> bool: if self.is_instances_ready: return True @@ -376,12 +380,14 @@ class AWS: # Spark Master KylinProperties.SPARK_MASTER.value: self.cloud_instance.get_target_cluster_spark_master_host(cluster_num), } - Utils.render_properties(params=params, cluster_num=cluster_num) - def upload_needed_files(self, tars: List, jars: List, scripts: List) -> None: + Utils.render_properties(params=params, cluster_num=cluster_num, kylin_mode=self.kylin_mode) + + def upload_needed_files(self, tars: List, jars: List, scripts: List, demos: List) -> None: self.cloud_instance.upload_tars_to_s3(tars) self.cloud_instance.upload_jars_to_s3(jars) self.cloud_instance.upload_scripts_to_s3(scripts) + self.cloud_instance.upload_demos_to_s3(demos) def check_needed_files(self, tars: List, jars: List, scripts: List) -> None: self.cloud_instance.check_tars_on_s3(tars) diff --git a/constant/deployment.py b/constant/deployment.py index e7ca930..9b840d4 100644 --- a/constant/deployment.py +++ b/constant/deployment.py @@ -23,6 +23,7 @@ class DeployType(Enum): LIST = 'list' SCALE = 'scale' DESTROY = 'destroy' + DESTROY_ALL = 'destroy-all' class ScaleType(Enum): @@ -38,3 +39,7 @@ class NodeType(Enum): class Cluster(Enum): ALL = 'all' DEFAULT = 'default' + + +class MODE(Enum): + KYLIN = 'KYLIN_MODE' diff --git a/constant/path.py b/constant/path.py index 833a31f..8c39bcb 100644 --- a/constant/path.py +++ b/constant/path.py @@ -20,9 +20,11 @@ import os CUR_DIR = os.path.dirname(__file__) BACKUP_PATH = os.path.join(CUR_DIR, '..', 'backup') JARS_PATH = os.path.join(BACKUP_PATH, 'jars') +DEMOS_PATH = os.path.join(BACKUP_PATH, 'demos') SCRIPTS_PATH = os.path.join(BACKUP_PATH, 'scripts') TARS_PATH = os.path.join(BACKUP_PATH, 'tars') KYLIN_PROPERTIES_DIR = os.path.join(BACKUP_PATH, 'properties') +KYLIN_PROPERTIES_TEMPLATES_DIR = os.path.join(KYLIN_PROPERTIES_DIR, 'mode_templates') KYLIN_PROPERTIES_TEMPLATE_DIR = os.path.join(KYLIN_PROPERTIES_DIR, '{cluster_num}') PROPERTIES_TEMPLATE_DIR = os.path.join(KYLIN_PROPERTIES_DIR, 'templates') diff --git a/constant/yaml_params.py b/constant/yaml_params.py index 8482922..b319837 100644 --- a/constant/yaml_params.py +++ b/constant/yaml_params.py @@ -43,6 +43,9 @@ class Params(Enum): # Rds params DB_HOST = 'DbHost' + # Glue params + SUPPORT_GLUE = 'SupportGlue' + # Static services params STATIC_SERVICES_PRIVATE_IP = 'StaticServicesNodePrivateIp' STATIC_SERVICES_PUBLIC_IP = 'StaticServicesNodePublicIp' diff --git a/deploy.py b/deploy.py index 6085064..96afecf 100644 --- a/deploy.py +++ b/deploy.py @@ -17,13 +17,33 @@ import argparse import logging.config +import os +from typing import Dict -from constant.deployment import DeployType, ScaleType, NodeType, Cluster +import yaml +from constant.deployment import DeployType, ScaleType, NodeType, Cluster, MODE +from constant.server_mode import ServerMode +from constant.yaml_files import File +from constant.yaml_params import Params + + +def deploy_on_aws(deploy_type: str, kylin_mode: str, scale_type: str, node_type: str, cluster: str = None) -> None: + + # check destroy type + is_destroy_all = deploy_type == DeployType.DESTROY_ALL.value + + # load config file + config = load_config_file(kylin_mode, is_destroy_all) + + # modify the destroy params + if is_destroy_all: + deploy_type = DeployType.DESTROY.value + cluster = Cluster.ALL.value -def deploy_on_aws(deploy_type: str, scale_type: str, node_type: str, cluster: str = None) -> None: from engine import Engine - aws_engine = Engine() + aws_engine = Engine(config) + if not aws_engine.is_ec2_cluster: msg = f'Now only supported platform: EC2, please check `DEPLOY_PLATFORM`.' raise Exception(msg) @@ -67,6 +87,19 @@ def deploy_on_aws(deploy_type: str, scale_type: str, node_type: str, cluster: st aws_engine.scale_nodes(scale_type, node_type, cluster) +def load_config_file(kylin_mode: str, is_destroy_all: bool = False) -> Dict: + # load config file + d = os.path.dirname(__file__) + with open(os.path.join(d, File.CONFIG_YAML.value)) as stream: + config = yaml.safe_load(stream) + config[MODE.KYLIN.value] = kylin_mode + + # change the default value of `ALWAYS_DESTROY_VPC_RDS_MONITOR` + if is_destroy_all: + config[Params.ALWAYS_DESTROY_VPC_RDS_MONITOR.value] = 'true' + return config + + if __name__ == '__main__': logging.config.fileConfig('logging.ini') @@ -74,7 +107,11 @@ if __name__ == '__main__': parser.add_argument("--type", required=False, default=DeployType.LIST.value, dest='type', choices=[e.value for e in DeployType], help="Use 'deploy' to create a cluster or 'destroy' to delete a cluster " - "or 'list' to list alive nodes.") + "or 'list' to list alive nodes or 'destroy-all' to delete everything.") + parser.add_argument("--mode", required=False, default=ServerMode.JOB.value, dest='kylin_mode', + choices=[e.value for e in ServerMode], + help="Select a 'mode' for kylin which mode will be in [all, query, job] in a cluster, " + "default mode is all.") parser.add_argument("--scale-type", required=False, dest='scale_type', choices=[e.value for e in ScaleType], help="This param must be used with '--type' and '--node-type' " @@ -96,4 +133,4 @@ if __name__ == '__main__': "This param must be used with '--type', " "`default` is for to deploy or destroy nodes of `default` cluster.") args = parser.parse_args() - deploy_on_aws(args.type, args.scale_type, args.node_type, args.cluster) + deploy_on_aws(args.type, args.kylin_mode, args.scale_type, args.node_type, args.cluster) diff --git a/engine.py b/engine.py index ee2f6c0..f43d438 100644 --- a/engine.py +++ b/engine.py @@ -32,13 +32,9 @@ logger = logging.getLogger(__name__) class Engine: - def __init__(self) -> None: - d = os.path.dirname(__file__) - with open(os.path.join(d, File.CONFIG_YAML.value)) as stream: - config = yaml.safe_load(stream) + def __init__(self, config) -> None: self.config = config self.is_ec2_cluster = self.config[Config.DEPLOY_PLATFORM.value] == Client.EC2.value - self.server_mode = None self.engine_utils = EngineUtils(self.config) def launch_default_cluster(self): @@ -117,6 +113,7 @@ class Engine: return self.engine_utils.download_tars() self.engine_utils.download_jars() + self.engine_utils.download_demo() self.engine_utils.upload_needed_files() # check again assert self.is_inited_env() diff --git a/engine_utils.py b/engine_utils.py index acbc90e..5f432e7 100644 --- a/engine_utils.py +++ b/engine_utils.py @@ -82,6 +82,11 @@ class EngineUtils: jars.append(alluxio_client) return jars + def need_demo_files(self) -> list: + demo_meta = 'meta_backups.tgz' + demo_sql = 'create_kylin_demo_table.sql' + return [demo_meta, demo_sql] + @staticmethod def needed_scripts() -> List: kylin = 'prepare-ec2-env-for-kylin4.sh' @@ -222,9 +227,15 @@ class EngineUtils: for jar in jars: Utils.download_jar(jar) + def download_demo(self) -> None: + logger.info("Downloading demo files.") + files = self.need_demo_files() + for file in files: + Utils.download_demo(file) + def upload_needed_files(self) -> None: logger.info("Start to uploading tars.") - self.aws.upload_needed_files(self.needed_tars(), self.needed_jars(), self.needed_scripts()) + self.aws.upload_needed_files(self.needed_tars(), self.needed_jars(), self.needed_scripts(), self.need_demo_files()) logger.info("Uploaded tars successfully.") def check_needed_files(self) -> None: diff --git a/instances/aws_instance.py b/instances/aws_instance.py index c256fa7..8be37e8 100644 --- a/instances/aws_instance.py +++ b/instances/aws_instance.py @@ -29,8 +29,9 @@ from botocore.exceptions import ClientError, WaiterError, ParamValidationError from constant.client import Client from constant.commands import Commands from constant.config import Config -from constant.deployment import NodeType, Cluster -from constant.path import JARS_PATH, TARS_PATH, SCRIPTS_PATH, KYLIN_PROPERTIES_TEMPLATE_DIR +from constant.deployment import NodeType, Cluster, MODE +from constant.path import JARS_PATH, TARS_PATH, SCRIPTS_PATH, KYLIN_PROPERTIES_TEMPLATE_DIR, DEMOS_PATH +from constant.server_mode import ServerMode from constant.yaml_files import File from constant.yaml_params import Params from utils import Utils @@ -51,6 +52,14 @@ class AWSInstance: self.db_host = None @property + def kylin_mode(self) -> str: + return self.config[MODE.KYLIN.value] + + @property + def is_support_glue(self) -> bool: + return self.kylin_mode == ServerMode.JOB.value + + @property def is_associated_public_ip(self) -> bool: return self.config[Params.ASSOSICATED_PUBLIC_IP.value] == 'true' @@ -289,6 +298,10 @@ class AWSInstance: return self.bucket_dir + 'jars/' @property + def bucket_demos_dir(self) -> str: + return self.bucket_dir + 'kylin_demo/' + + @property def iam_role(self) -> str: return self.config[Config.IAM.value] @@ -669,6 +682,11 @@ class AWSInstance: params[Params.DB_HOST.value] = self.get_db_host() params[Params.CLUSTER_NUM.value] = str(cluster_num) if cluster_num else 'default' + # make that spark master don't need to support glue + if not self.is_support_glue: + # note: this param must be the type of `str` + params[Params.SUPPORT_GLUE.value] = 'false' + resp = self.create_stack( stack_name=kylin_stack_name, file_path=self.path_of_kylin_stack, @@ -802,6 +820,11 @@ class AWSInstance: params[Params.CLUSTER_NUM.value] = str(cluster_num) params[Params.IS_SCALED.value] = 'true' + # make that spark master don't need to support glue + if not self.is_support_glue: + # note: this param must be the type of `str` + params[Params.SUPPORT_GLUE.value] = 'false' + resp = self.create_stack( stack_name=stack_name, file_path=self.path_of_kylin_scale_stack, @@ -879,6 +902,12 @@ class AWSInstance: # update needed params params = self.update_basic_params(params) params[Params.DB_HOST.value] = self.get_db_host() + + # make that spark master don't need to support glue + if not self.is_support_glue: + # note: this param must be the type of `str` + params[Params.SUPPORT_GLUE.value] = 'false' + resp = self.create_stack( stack_name=spark_master_stack_name, file_path=self.path_of_spark_master_stack, @@ -956,6 +985,11 @@ class AWSInstance: params = self.update_basic_params(params) params[Params.SPARK_MASTER_HOST.value] = self.get_spark_master_host_by_name(spark_master_stack) + # make that spark master don't need to support glue + if not self.is_support_glue: + # note: this param must be the type of `str` + params[Params.SUPPORT_GLUE.value] = 'false' + resp = self.create_stack( stack_name=spark_worker_stack, file_path=self.path_of_spark_slave_stack, @@ -1122,6 +1156,11 @@ class AWSInstance: params[Params.SPARK_MASTER_HOST.value] = self.get_spark_master_host_by_name(spark_master_stack) params[Params.SPARK_WORKER_NUM.value] = str(worker_num) + # make that spark master don't need to support glue + if not self.is_support_glue: + # note: this param must be the type of `str` + params[Params.SUPPORT_GLUE.value] = 'false' + resp = self.create_stack( stack_name=stack_name, file_path=self.path_of_spark_slave_scaled_stack, @@ -1748,17 +1787,39 @@ class AWSInstance: return handled_outputs def alive_nodes(self, cluster_nums: List = None) -> None: + msgs = self._get_live_nodes_messages(cluster_nums=cluster_nums) + logger.info(f"Fetching messages successfully ...") + + header_msg = '\n=================== List Alive Nodes ===========================\n' + result = header_msg + f"Stack Name\t\tInstance ID\t\tPrivate Ip\t\tPublic Ip\t\t\n" + for msg in msgs: + result += msg + '\n' + result += header_msg + logger.info(result) + + def _get_live_nodes_messages(self, cluster_nums: List) -> List: # default cluster + logger.info("Fetching static service node messages ...") static_msg = self.get_static_services_basic_msg() + + logger.info(f"Fetching kylin node messages ...") kylin_msg = self.get_kylin_basic_msg() + + logger.info(f"Fetching spark master node messages ...") spark_master_msg = self.get_spark_master_msg() msgs = [m for m in [static_msg, kylin_msg, spark_master_msg] if m] + logger.info(f"Fetching spark worker nodes messages ...") spark_slaves_msg = self.get_spark_slaves_basic_msg() + + logger.info(f"Fetching Zookeeper nodes messages ...") zks_msg = self.get_zks_basic_msg() + logger.info(f"Fetching scaled kylin nodes messages ...") scaled_kylins_msg = self.get_scaled_kylin_basic_msg() + + logger.info(f"Fetching scaled spark worker nodes messages ...") scaled_spark_workers_msg = self.get_scaled_spark_workers_basic_msg() msgs.extend(zks_msg) @@ -1770,10 +1831,14 @@ class AWSInstance: if cluster_nums is None: cluster_nums = [] for num in cluster_nums: + logger.info(f"Fetching zookeepers nodes of cluster {num} messages ...") zks_msg_of_target_cluster = self.get_zks_of_target_cluster_msg(num) msgs.extend(zks_msg_of_target_cluster) + logger.info(f"Fetching spark master nodes of cluster {num} messages ...") spark_master_msg_of_target_cluster = self.get_spark_master_of_target_cluster_msg(num) + + logger.info(f"Fetching kylin nodes of cluster {num} messages ...") kylin_msg_of_target_cluster = self.get_kylin_basic_msg_of_target_cluster(num) for msg in [spark_master_msg_of_target_cluster, kylin_msg_of_target_cluster]: @@ -1781,20 +1846,19 @@ class AWSInstance: continue msgs.append(msg) + logger.info(f"Fetching spark worker nodes of cluster {num} messages ...") spark_workers_msgs_of_target_cluster = self.get_spark_workers_of_target_cluster_msg(num) msgs.extend(spark_workers_msgs_of_target_cluster) + logger.info(f"Fetching scaled kylin nodes of cluster {num} messages ...") scaled_kylins_msg_of_target_cluster = self.get_scaled_kylin_basic_msg_of_target_cluster(num) + + logger.info(f"Fetching scaled spark worker nodes of cluster {num} messages ...") scaled_workers_msg_of_target_cluster = self.get_scaled_spark_workers_basic_msg_of_target_cluster(num) msgs.extend(scaled_kylins_msg_of_target_cluster) msgs.extend(scaled_workers_msg_of_target_cluster) - header_msg = '\n=================== List Alive Nodes ===========================\n' - result = header_msg + f"Stack Name\t\tInstance ID\t\tPrivate Ip\t\tPublic Ip\t\t\n" - for msg in msgs: - result += msg + '\n' - result += header_msg - logger.info(result) + return msgs def is_ec2_stacks_ready(self) -> bool: if not ( @@ -2045,6 +2109,13 @@ class AWSInstance: continue self.upload_file_to_s3(SCRIPTS_PATH, script, self.bucket, self.bucket_scripts_dir) + def upload_demos_to_s3(self, demos: List) -> None: + for demo in demos: + if self.is_object_exists_on_s3(demo, self.bucket, self.bucket_demos_dir): + logger.info(f'{demo} already exists, skip upload it.') + continue + self.upload_file_to_s3(DEMOS_PATH, demo, self.bucket, self.bucket_demos_dir) + def upload_kylin_properties_to_s3(self, cluster_num: int = None, properties_file: str = 'kylin.properties') -> None: # Always to upload local kylin.properties local_dir = KYLIN_PROPERTIES_TEMPLATE_DIR.format(cluster_num=cluster_num if cluster_num else 'default') diff --git a/kylin_configs.yaml b/kylin_configs.yaml index d924459..fcf2020 100644 --- a/kylin_configs.yaml +++ b/kylin_configs.yaml @@ -54,7 +54,9 @@ CIDR_IP: ${Cidr Ip} # If you set `SUPPORT_GLUE` to be `true`, then please make sure that you using Kylin only support `Job` mode, not `Query` as same as `All`. # Because `All` will support `Query`. So you need to change the `kylin.server.mode` in `kylin.properties` to be `Job` or `All`. # If you set `All` mode for Kylin, there will be a error when you query any sql. -SUPPORT_GLUE: &SUPPORT_GLUE 'false' +# This parameter will be set automatically by `kylin_mode`. Such as if `kylin_mode` is 'job', `SUPPORT_GLUE` will be 'true', and other mode(`query` or `all`) will be 'false'. +# So user don't need to care about this parameters. +SUPPORT_GLUE: &SUPPORT_GLUE 'true' # Enable using MDX ENABLE_MDX: &ENABLE_MDX 'false' diff --git a/utils.py b/utils.py index 568e82d..833d4de 100644 --- a/utils.py +++ b/utils.py @@ -32,7 +32,10 @@ from constant.path import ( RENDERED_FILE, PROPERTIES_TEMPLATE_DIR, TEMPLATE_OF_KYLIN_PROPERTIES, + KYLIN_PROPERTIES_TEMPLATES_DIR, + DEMOS_PATH, ) +from constant.server_mode import ServerMode logger = logging.getLogger(__name__) @@ -91,6 +94,13 @@ class Utils: Utils.is_downloaded_success(filename=filename, dest_folder=JARS_PATH) @staticmethod + def download_demo(filename: str) -> str: + base_url = Utils.DOWNLOAD_BASE_URL + '/kylin_demo/' + url = base_url + filename + Utils.download(url=url, dest_folder=DEMOS_PATH, filename=filename) + Utils.is_downloaded_success(filename=filename, dest_folder=DEMOS_PATH) + + @staticmethod def download(url: str, dest_folder: str, filename: str) -> None: if not Utils.is_file_exists(dest_folder): # create folder if it does not exist @@ -139,20 +149,26 @@ class Utils: yield f @staticmethod - def render_properties(params: Dict, cluster_num: int = None, properties_template: str = 'kylin.properties') -> None: - search_path = KYLIN_PROPERTIES_TEMPLATE_DIR.format(cluster_num=cluster_num if cluster_num else 'default') + def render_properties(params: Dict, + cluster_num: int = None, + properties_file: str = 'kylin.properties', + kylin_mode: str = 'all') -> None: + target_path = KYLIN_PROPERTIES_TEMPLATE_DIR.format(cluster_num=cluster_num if cluster_num else 'default') - dest_path = os.path.join(search_path, 'kylin.properties') - rendered_file = os.path.join(search_path, RENDERED_FILE) + # replace a matched kylin.properties for a kylin in cluster + Utils.replace_property_for_kylin(target_path, kylin_mode) + + dest_path = os.path.join(target_path, 'kylin.properties') + rendered_file = os.path.join(target_path, RENDERED_FILE) if Utils.is_file_exists(rendered_file): logger.info(f'{dest_path} already rendered. Skip render it again.') return - env = Environment(loader=FileSystemLoader(searchpath=search_path)) + env = Environment(loader=FileSystemLoader(searchpath=target_path)) try: - template = env.get_template(properties_template) + template = env.get_template(properties_file) except TemplateNotFound: - raise Exception(f'Properties template: {properties_template} not in the path: {search_path}.\n ' + raise Exception(f'Properties template: {properties_file} not in the path: {target_path}.\n ' f'Please copy the needed kylin.properties template in `backup/properties/templates` ' f'to `backup/properties/{cluster_num}`\n. If `backup/properties/{cluster_num}` not exists, ' f'please make it and rename the template file to `kylin.properties` in this dir.') @@ -166,9 +182,19 @@ class Utils: logger.info(f'Current {dest_path} rendered.') @staticmethod - def refresh_kylin_properties(properties_template: str = 'kylin.properties') -> None: + def replace_property_for_kylin(target_path: str, kylin_mode: str = 'all', properties_file: str = 'kylin.properties'): + # default kylin.properties is for 'all' mode to a kylin node + if kylin_mode == ServerMode.ALL.value: + return + + source_file = os.path.join(KYLIN_PROPERTIES_TEMPLATES_DIR, kylin_mode, properties_file) + destination_file = os.path.join(target_path, properties_file) + shutil.copy(source_file, destination_file) + + @staticmethod + def refresh_kylin_properties(properties_file: str = 'kylin.properties') -> None: Utils.refresh_kylin_properties_in_clusters() - Utils.refresh_kylin_properties_in_default(properties_template=properties_template) + Utils.refresh_kylin_properties_in_default(properties_template=properties_file) @staticmethod def refresh_kylin_properties_in_clusters(cluster_nums: List[int] = None) -> None: @@ -220,9 +246,11 @@ class Utils: if filename not in Utils.FILES_SIZE_IN_BYTES.keys(): logger.warning(f'Current file {filename} is not the matched version, skip check file size.') return - assert Utils.FILES_SIZE_IN_BYTES[filename] == Utils.size_in_bytes(dest_folder=dest_folder, filename=filename), \ - f'{filename} size should be {Utils.FILES_SIZE_IN_BYTES[filename]} bytes' \ - f'not {Utils.size_in_bytes(dest_folder, filename)} bytes, please check.' + expected_size = Utils.FILES_SIZE_IN_BYTES[filename] + real_size = Utils.size_in_bytes(dest_folder=dest_folder, filename=filename) + + assert expected_size == real_size, \ + f'{filename} size should be {expected_size} bytes not {real_size} bytes, please check.' logger.info(f'Downloaded file {filename} successfully.') @staticmethod