This is an automated email from the ASF dual-hosted git repository.
yaqian pushed a commit to branch kylin4_on_cloud
in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/kylin4_on_cloud by this push:
new 596b8cb Fix 0328 (#1837)
596b8cb is described below
commit 596b8cb36f37167c1669e6f3d19db636c89eea27
Author: Tengting Xu <[email protected]>
AuthorDate: Wed Mar 30 09:48:56 2022 +0800
Fix 0328 (#1837)
* # support different mode of kylin & auto enable glue by the mode of kylin
* # add log for fetching nodes messages
* minor fix
* # minor fix
* # minor fix
---
.gitignore | 3 +-
backup/properties/default/kylin.properties | 1 +
.../all}/kylin.properties | 1 +
.../job}/kylin.properties | 18 +++--
.../query}/kylin.properties | 8 +-
.../properties/templates/kylin.properties.template | 1 +
backup/scripts/prepare-ec2-env-for-kylin4.sh | 40 ++++++++++
clouds/aws.py | 12 ++-
constant/deployment.py | 5 ++
constant/path.py | 2 +
constant/yaml_params.py | 3 +
deploy.py | 47 ++++++++++--
engine.py | 7 +-
engine_utils.py | 13 +++-
instances/aws_instance.py | 87 ++++++++++++++++++++--
kylin_configs.yaml | 4 +-
utils.py | 52 ++++++++++---
17 files changed, 256 insertions(+), 48 deletions(-)
diff --git a/.gitignore b/.gitignore
index a087ce7..51a5140 100644
--- a/.gitignore
+++ b/.gitignore
@@ -8,4 +8,5 @@ __pycache__/
.idea
logs/
backup/jars
-backup/tars
\ No newline at end of file
+backup/tars
+backup/demos
diff --git a/backup/properties/default/kylin.properties
b/backup/properties/default/kylin.properties
index aecaa8f..3cc83dd 100644
--- a/backup/properties/default/kylin.properties
+++ b/backup/properties/default/kylin.properties
@@ -44,6 +44,7 @@
kylin.engine.spark-conf.spark.hadoop.fs.s3.impl=org.apache.hadoop.fs.s3a.S3AFile
## Query Engine Resource
kylin.query.spark-conf.spark.master=spark://{{ SPARK_MASTER }}:7077
+kylin.query.spark-conf.spark.cores.max=4
kylin.query.spark-conf.spark.driver.cores=1
kylin.query.spark-conf.spark.driver.memory=8GB
kylin.query.spark-conf.spark.driver.memoryOverhead=1G
diff --git a/backup/properties/default/kylin.properties
b/backup/properties/mode_templates/all/kylin.properties
similarity index 97%
copy from backup/properties/default/kylin.properties
copy to backup/properties/mode_templates/all/kylin.properties
index aecaa8f..3cc83dd 100644
--- a/backup/properties/default/kylin.properties
+++ b/backup/properties/mode_templates/all/kylin.properties
@@ -44,6 +44,7 @@
kylin.engine.spark-conf.spark.hadoop.fs.s3.impl=org.apache.hadoop.fs.s3a.S3AFile
## Query Engine Resource
kylin.query.spark-conf.spark.master=spark://{{ SPARK_MASTER }}:7077
+kylin.query.spark-conf.spark.cores.max=4
kylin.query.spark-conf.spark.driver.cores=1
kylin.query.spark-conf.spark.driver.memory=8GB
kylin.query.spark-conf.spark.driver.memoryOverhead=1G
diff --git a/backup/properties/default/kylin.properties
b/backup/properties/mode_templates/job/kylin.properties
similarity index 86%
copy from backup/properties/default/kylin.properties
copy to backup/properties/mode_templates/job/kylin.properties
index aecaa8f..3bbe87b 100644
--- a/backup/properties/default/kylin.properties
+++ b/backup/properties/mode_templates/job/kylin.properties
@@ -17,7 +17,7 @@
# Kylin server mode, valid value [all, query, job]
-kylin.server.mode=all
+kylin.server.mode=job
kylin.metadata.url=kylin_metadata@jdbc,url=jdbc:mysql://{{ DB_HOST }}:{{
DB_PORT }}/kylin,username=root,password={{ DB_PASSWORD
}},maxActive=10,maxIdle=10
kylin.env.zookeeper-connect-string={{ ZOOKEEPER_HOST }}
kylin.env.hdfs-working-dir=s3a:/{{ S3_BUCKET_PATH }}/working_dir/
@@ -27,8 +27,10 @@
kylin.engine.spark-conf.spark.history.fs.logDirectory=s3a:/{{ S3_BUCKET_PATH }}/
kylin.engine.spark-conf.spark.master=spark://{{ SPARK_MASTER }}:7077
kylin.cube.cubeplanner.enabled=false
+
+## Job Engine parameters
kylin.engine.spark-conf.spark.executor.cores=2
-kylin.engine.spark-conf.spark.executor.instances=4
+kylin.engine.spark-conf.spark.executor.instances=5
kylin.engine.spark-conf.spark.executor.memory=7GB
kylin.engine.spark-conf.spark.executor.memoryOverhead=1GB
@@ -45,12 +47,12 @@
kylin.engine.spark-conf.spark.hadoop.fs.s3.impl=org.apache.hadoop.fs.s3a.S3AFile
## Query Engine Resource
kylin.query.spark-conf.spark.master=spark://{{ SPARK_MASTER }}:7077
kylin.query.spark-conf.spark.driver.cores=1
-kylin.query.spark-conf.spark.driver.memory=8GB
-kylin.query.spark-conf.spark.driver.memoryOverhead=1G
-kylin.query.spark-conf.spark.executor.instances=2
-kylin.query.spark-conf.spark.executor.cores=2
-kylin.query.spark-conf.spark.executor.memory=7G
-kylin.query.spark-conf.spark.executor.memoryOverhead=1G
+kylin.query.spark-conf.spark.driver.memory=512M
+kylin.query.spark-conf.spark.driver.memoryOverhead=256M
+kylin.query.spark-conf.spark.cores.max=1
+kylin.query.spark-conf.spark.executor.memory=512M
+kylin.query.spark-conf.spark.executor.memoryOverhead=256M
+
kylin.query.spark-conf.spark.sql.parquet.filterPushdown=false
kylin.query.spark-conf.spark.hadoop.parquet.filter.columnindex.enabled=true
diff --git a/backup/properties/default/kylin.properties
b/backup/properties/mode_templates/query/kylin.properties
similarity index 95%
copy from backup/properties/default/kylin.properties
copy to backup/properties/mode_templates/query/kylin.properties
index aecaa8f..d1cb3e0 100644
--- a/backup/properties/default/kylin.properties
+++ b/backup/properties/mode_templates/query/kylin.properties
@@ -17,7 +17,7 @@
# Kylin server mode, valid value [all, query, job]
-kylin.server.mode=all
+kylin.server.mode=query
kylin.metadata.url=kylin_metadata@jdbc,url=jdbc:mysql://{{ DB_HOST }}:{{
DB_PORT }}/kylin,username=root,password={{ DB_PASSWORD
}},maxActive=10,maxIdle=10
kylin.env.zookeeper-connect-string={{ ZOOKEEPER_HOST }}
kylin.env.hdfs-working-dir=s3a:/{{ S3_BUCKET_PATH }}/working_dir/
@@ -58,11 +58,11 @@
kylin.query.spark-conf.spark.hadoop.parquet.filter.columnindex.enabled=true
kylin.query.spark-conf.spark.ui.prometheus.enabled=true
kylin.query.spark-conf.spark.executor.processTreeMetrics.enabled=true
-## Disable canary
-kylin.canary.sparder-context-canary-enabled=false
+## Enable canary
+kylin.canary.sparder-context-canary-enabled=true
## Query Cache
kylin.query.cache-enabled=true
### Prepare for cluster mode
kylin.job.scheduler.default=100
-kylin.server.self-discovery-enabled=true
\ No newline at end of file
+kylin.server.self-discovery-enabled=true
diff --git a/backup/properties/templates/kylin.properties.template
b/backup/properties/templates/kylin.properties.template
index aecaa8f..3cc83dd 100644
--- a/backup/properties/templates/kylin.properties.template
+++ b/backup/properties/templates/kylin.properties.template
@@ -44,6 +44,7 @@
kylin.engine.spark-conf.spark.hadoop.fs.s3.impl=org.apache.hadoop.fs.s3a.S3AFile
## Query Engine Resource
kylin.query.spark-conf.spark.master=spark://{{ SPARK_MASTER }}:7077
+kylin.query.spark-conf.spark.cores.max=4
kylin.query.spark-conf.spark.driver.cores=1
kylin.query.spark-conf.spark.driver.memory=8GB
kylin.query.spark-conf.spark.driver.memoryOverhead=1G
diff --git a/backup/scripts/prepare-ec2-env-for-kylin4.sh
b/backup/scripts/prepare-ec2-env-for-kylin4.sh
index bfdfdfa..6428d84 100644
--- a/backup/scripts/prepare-ec2-env-for-kylin4.sh
+++ b/backup/scripts/prepare-ec2-env-for-kylin4.sh
@@ -194,6 +194,10 @@ HIVE_PACKAGE=apache-hive-${HIVE_VERSION}-bin.tar.gz
NODE_EXPORTER_PACKAGE=node_exporter-1.3.1.linux-amd64.tar.gz
MDX_PACKAGE=mdx-kylin-${MDX_VERSION}.tar.gz
+# Glue needed files
+GLUE_META=meta_backups.tgz
+GLUE_SQL=create_kylin_demo_table.sql
+
### Parameter for JDK 1.8
JDK_PACKAGE=jdk-8u301-linux-x64.tar.gz
JDK_DECOMPRESS_NAME=jdk1.8.0_301
@@ -206,6 +210,11 @@ function init_env() {
mkdir ${HADOOP_DIR}
fi
+ DEMO_DIR=${HOME_DIR}/kylin_demo
+ if [[ ! -d $DEMO_DIR ]]; then
+ mkdir ${DEMO_DIR}
+ fi
+
JAVA_HOME=/usr/local/java
JRE_HOME=${JAVA_HOME}/jre
HADOOP_HOME=${HADOOP_DIR}/hadoop-${HADOOP_VERSION}
@@ -585,6 +594,19 @@
master.sink.prometheusServlet.path=/metrics/master/prometheus
applications.sink.prometheusServlet.path=/metrics/applications/prometheus
EOF
+
+ # Support to customize spark-defaults.conf
+ cat <<EOF > ${SPARK_HOME}/conf/spark-defaults.conf
+spark.hadoop.fs.s3.impl org.apache.hadoop.fs.s3a.S3AFileSystem
+spark.hadoop.fs.s3a.endpoint s3.${CURRENT_REGION}.amazonaws.com.cn
+spark.master spark://${spark_master_node_private_ip}:7077
+spark.driver.cores 1
+spark.driver.memory 8G
+spark.driver.memoryOverhead 1G
+spark.executor.memory 7G
+spark.executor.memoryOverhead 1G
+EOF
+
logging info "Spark inited ..."
touch ${HOME_DIR}/.inited_spark
logging info "Spark is ready ..."
@@ -891,6 +913,23 @@ function start_node_exporter() {
nohup ${NODE_EXPORTER_HOME}/node_exporter >>${NODE_EXPORTER_HOME}/node.log
2>&1 &
}
+function download_glue_meta() {
+ if [[ ! -f ${DEMO_DIR}/$GLUE_META ]]; then
+ logging info "Glue Meta package ${GLUE_META} not downloaded, downloading
it ..."
+ aws s3 cp ${PATH_TO_BUCKET}/kylin_demo/${GLUE_META} ${DEMO_DIR} --region
${CURRENT_REGION}
+ fi
+
+ if [[ ! -d ${HOME_DIR}/kylin_demo/${GLUE_META%*.tgz} ]]; then
+ logging info "Decompressing package ${GLUE_META} ..."
+ tar -zxf ${DEMO_DIR}/${GLUE_META}
+ fi
+
+ if [[ ! -f ${HOME_DIR}/$ ]]; then
+ logging info "Glue Sql file ${GLUE_META} not downloaded, downloading it
..."
+ aws s3 cp ${PATH_TO_BUCKET}/kylin_demo/$GLUE_SQL ${DEMO_DIR} --region
${CURRENT_REGION}
+ fi
+}
+
function prepare_packages() {
if [[ -f ${HOME_DIR}/.prepared_packages ]]; then
logging warn "Packages already prepared, skip prepare ..."
@@ -931,6 +970,7 @@ function start_services_on_kylin() {
sample_for_kylin
start_kylin
after_start_kylin
+ download_glue_meta
touch ${HOME_DIR}/.first_run
fi
restart_kylin
diff --git a/clouds/aws.py b/clouds/aws.py
index c71ce95..8b944ea 100644
--- a/clouds/aws.py
+++ b/clouds/aws.py
@@ -23,7 +23,7 @@ from itertools import repeat
from typing import Optional, Dict, List
from constant.config import Config
-from constant.deployment import ScaleType, NodeType
+from constant.deployment import ScaleType, NodeType, MODE
from constant.kylin_properties_params import KylinProperties
from constant.path import KYLIN_PROPERTIES_TEMPLATE_DIR
from constant.yaml_params import Params
@@ -40,6 +40,10 @@ class AWS:
self.config = config
@property
+ def kylin_mode(self):
+ return self.config[MODE.KYLIN.value]
+
+ @property
def is_cluster_ready(self) -> bool:
if self.is_instances_ready:
return True
@@ -376,12 +380,14 @@ class AWS:
# Spark Master
KylinProperties.SPARK_MASTER.value:
self.cloud_instance.get_target_cluster_spark_master_host(cluster_num),
}
- Utils.render_properties(params=params, cluster_num=cluster_num)
- def upload_needed_files(self, tars: List, jars: List, scripts: List) ->
None:
+ Utils.render_properties(params=params, cluster_num=cluster_num,
kylin_mode=self.kylin_mode)
+
+ def upload_needed_files(self, tars: List, jars: List, scripts: List,
demos: List) -> None:
self.cloud_instance.upload_tars_to_s3(tars)
self.cloud_instance.upload_jars_to_s3(jars)
self.cloud_instance.upload_scripts_to_s3(scripts)
+ self.cloud_instance.upload_demos_to_s3(demos)
def check_needed_files(self, tars: List, jars: List, scripts: List) ->
None:
self.cloud_instance.check_tars_on_s3(tars)
diff --git a/constant/deployment.py b/constant/deployment.py
index e7ca930..9b840d4 100644
--- a/constant/deployment.py
+++ b/constant/deployment.py
@@ -23,6 +23,7 @@ class DeployType(Enum):
LIST = 'list'
SCALE = 'scale'
DESTROY = 'destroy'
+ DESTROY_ALL = 'destroy-all'
class ScaleType(Enum):
@@ -38,3 +39,7 @@ class NodeType(Enum):
class Cluster(Enum):
ALL = 'all'
DEFAULT = 'default'
+
+
+class MODE(Enum):
+ KYLIN = 'KYLIN_MODE'
diff --git a/constant/path.py b/constant/path.py
index 833a31f..8c39bcb 100644
--- a/constant/path.py
+++ b/constant/path.py
@@ -20,9 +20,11 @@ import os
CUR_DIR = os.path.dirname(__file__)
BACKUP_PATH = os.path.join(CUR_DIR, '..', 'backup')
JARS_PATH = os.path.join(BACKUP_PATH, 'jars')
+DEMOS_PATH = os.path.join(BACKUP_PATH, 'demos')
SCRIPTS_PATH = os.path.join(BACKUP_PATH, 'scripts')
TARS_PATH = os.path.join(BACKUP_PATH, 'tars')
KYLIN_PROPERTIES_DIR = os.path.join(BACKUP_PATH, 'properties')
+KYLIN_PROPERTIES_TEMPLATES_DIR = os.path.join(KYLIN_PROPERTIES_DIR,
'mode_templates')
KYLIN_PROPERTIES_TEMPLATE_DIR = os.path.join(KYLIN_PROPERTIES_DIR,
'{cluster_num}')
PROPERTIES_TEMPLATE_DIR = os.path.join(KYLIN_PROPERTIES_DIR, 'templates')
diff --git a/constant/yaml_params.py b/constant/yaml_params.py
index 8482922..b319837 100644
--- a/constant/yaml_params.py
+++ b/constant/yaml_params.py
@@ -43,6 +43,9 @@ class Params(Enum):
# Rds params
DB_HOST = 'DbHost'
+ # Glue params
+ SUPPORT_GLUE = 'SupportGlue'
+
# Static services params
STATIC_SERVICES_PRIVATE_IP = 'StaticServicesNodePrivateIp'
STATIC_SERVICES_PUBLIC_IP = 'StaticServicesNodePublicIp'
diff --git a/deploy.py b/deploy.py
index 6085064..96afecf 100644
--- a/deploy.py
+++ b/deploy.py
@@ -17,13 +17,33 @@
import argparse
import logging.config
+import os
+from typing import Dict
-from constant.deployment import DeployType, ScaleType, NodeType, Cluster
+import yaml
+from constant.deployment import DeployType, ScaleType, NodeType, Cluster, MODE
+from constant.server_mode import ServerMode
+from constant.yaml_files import File
+from constant.yaml_params import Params
+
+
+def deploy_on_aws(deploy_type: str, kylin_mode: str, scale_type: str,
node_type: str, cluster: str = None) -> None:
+
+ # check destroy type
+ is_destroy_all = deploy_type == DeployType.DESTROY_ALL.value
+
+ # load config file
+ config = load_config_file(kylin_mode, is_destroy_all)
+
+ # modify the destroy params
+ if is_destroy_all:
+ deploy_type = DeployType.DESTROY.value
+ cluster = Cluster.ALL.value
-def deploy_on_aws(deploy_type: str, scale_type: str, node_type: str, cluster:
str = None) -> None:
from engine import Engine
- aws_engine = Engine()
+ aws_engine = Engine(config)
+
if not aws_engine.is_ec2_cluster:
msg = f'Now only supported platform: EC2, please check
`DEPLOY_PLATFORM`.'
raise Exception(msg)
@@ -67,6 +87,19 @@ def deploy_on_aws(deploy_type: str, scale_type: str,
node_type: str, cluster: st
aws_engine.scale_nodes(scale_type, node_type, cluster)
+def load_config_file(kylin_mode: str, is_destroy_all: bool = False) -> Dict:
+ # load config file
+ d = os.path.dirname(__file__)
+ with open(os.path.join(d, File.CONFIG_YAML.value)) as stream:
+ config = yaml.safe_load(stream)
+ config[MODE.KYLIN.value] = kylin_mode
+
+ # change the default value of `ALWAYS_DESTROY_VPC_RDS_MONITOR`
+ if is_destroy_all:
+ config[Params.ALWAYS_DESTROY_VPC_RDS_MONITOR.value] = 'true'
+ return config
+
+
if __name__ == '__main__':
logging.config.fileConfig('logging.ini')
@@ -74,7 +107,11 @@ if __name__ == '__main__':
parser.add_argument("--type", required=False,
default=DeployType.LIST.value, dest='type',
choices=[e.value for e in DeployType],
help="Use 'deploy' to create a cluster or 'destroy' to
delete a cluster "
- "or 'list' to list alive nodes.")
+ "or 'list' to list alive nodes or 'destroy-all'
to delete everything.")
+ parser.add_argument("--mode", required=False,
default=ServerMode.JOB.value, dest='kylin_mode',
+ choices=[e.value for e in ServerMode],
+ help="Select a 'mode' for kylin which mode will be in
[all, query, job] in a cluster, "
+ "default mode is all.")
parser.add_argument("--scale-type", required=False, dest='scale_type',
choices=[e.value for e in ScaleType],
help="This param must be used with '--type' and
'--node-type' "
@@ -96,4 +133,4 @@ if __name__ == '__main__':
"This param must be used with '--type', "
"`default` is for to deploy or destroy nodes of
`default` cluster.")
args = parser.parse_args()
- deploy_on_aws(args.type, args.scale_type, args.node_type, args.cluster)
+ deploy_on_aws(args.type, args.kylin_mode, args.scale_type, args.node_type,
args.cluster)
diff --git a/engine.py b/engine.py
index ee2f6c0..f43d438 100644
--- a/engine.py
+++ b/engine.py
@@ -32,13 +32,9 @@ logger = logging.getLogger(__name__)
class Engine:
- def __init__(self) -> None:
- d = os.path.dirname(__file__)
- with open(os.path.join(d, File.CONFIG_YAML.value)) as stream:
- config = yaml.safe_load(stream)
+ def __init__(self, config) -> None:
self.config = config
self.is_ec2_cluster = self.config[Config.DEPLOY_PLATFORM.value] ==
Client.EC2.value
- self.server_mode = None
self.engine_utils = EngineUtils(self.config)
def launch_default_cluster(self):
@@ -117,6 +113,7 @@ class Engine:
return
self.engine_utils.download_tars()
self.engine_utils.download_jars()
+ self.engine_utils.download_demo()
self.engine_utils.upload_needed_files()
# check again
assert self.is_inited_env()
diff --git a/engine_utils.py b/engine_utils.py
index acbc90e..5f432e7 100644
--- a/engine_utils.py
+++ b/engine_utils.py
@@ -82,6 +82,11 @@ class EngineUtils:
jars.append(alluxio_client)
return jars
+ def need_demo_files(self) -> list:
+ demo_meta = 'meta_backups.tgz'
+ demo_sql = 'create_kylin_demo_table.sql'
+ return [demo_meta, demo_sql]
+
@staticmethod
def needed_scripts() -> List:
kylin = 'prepare-ec2-env-for-kylin4.sh'
@@ -222,9 +227,15 @@ class EngineUtils:
for jar in jars:
Utils.download_jar(jar)
+ def download_demo(self) -> None:
+ logger.info("Downloading demo files.")
+ files = self.need_demo_files()
+ for file in files:
+ Utils.download_demo(file)
+
def upload_needed_files(self) -> None:
logger.info("Start to uploading tars.")
- self.aws.upload_needed_files(self.needed_tars(), self.needed_jars(),
self.needed_scripts())
+ self.aws.upload_needed_files(self.needed_tars(), self.needed_jars(),
self.needed_scripts(), self.need_demo_files())
logger.info("Uploaded tars successfully.")
def check_needed_files(self) -> None:
diff --git a/instances/aws_instance.py b/instances/aws_instance.py
index c256fa7..8be37e8 100644
--- a/instances/aws_instance.py
+++ b/instances/aws_instance.py
@@ -29,8 +29,9 @@ from botocore.exceptions import ClientError, WaiterError,
ParamValidationError
from constant.client import Client
from constant.commands import Commands
from constant.config import Config
-from constant.deployment import NodeType, Cluster
-from constant.path import JARS_PATH, TARS_PATH, SCRIPTS_PATH,
KYLIN_PROPERTIES_TEMPLATE_DIR
+from constant.deployment import NodeType, Cluster, MODE
+from constant.path import JARS_PATH, TARS_PATH, SCRIPTS_PATH,
KYLIN_PROPERTIES_TEMPLATE_DIR, DEMOS_PATH
+from constant.server_mode import ServerMode
from constant.yaml_files import File
from constant.yaml_params import Params
from utils import Utils
@@ -51,6 +52,14 @@ class AWSInstance:
self.db_host = None
@property
+ def kylin_mode(self) -> str:
+ return self.config[MODE.KYLIN.value]
+
+ @property
+ def is_support_glue(self) -> bool:
+ return self.kylin_mode == ServerMode.JOB.value
+
+ @property
def is_associated_public_ip(self) -> bool:
return self.config[Params.ASSOSICATED_PUBLIC_IP.value] == 'true'
@@ -289,6 +298,10 @@ class AWSInstance:
return self.bucket_dir + 'jars/'
@property
+ def bucket_demos_dir(self) -> str:
+ return self.bucket_dir + 'kylin_demo/'
+
+ @property
def iam_role(self) -> str:
return self.config[Config.IAM.value]
@@ -669,6 +682,11 @@ class AWSInstance:
params[Params.DB_HOST.value] = self.get_db_host()
params[Params.CLUSTER_NUM.value] = str(cluster_num) if cluster_num
else 'default'
+ # make that spark master don't need to support glue
+ if not self.is_support_glue:
+ # note: this param must be the type of `str`
+ params[Params.SUPPORT_GLUE.value] = 'false'
+
resp = self.create_stack(
stack_name=kylin_stack_name,
file_path=self.path_of_kylin_stack,
@@ -802,6 +820,11 @@ class AWSInstance:
params[Params.CLUSTER_NUM.value] = str(cluster_num)
params[Params.IS_SCALED.value] = 'true'
+ # make that spark master don't need to support glue
+ if not self.is_support_glue:
+ # note: this param must be the type of `str`
+ params[Params.SUPPORT_GLUE.value] = 'false'
+
resp = self.create_stack(
stack_name=stack_name,
file_path=self.path_of_kylin_scale_stack,
@@ -879,6 +902,12 @@ class AWSInstance:
# update needed params
params = self.update_basic_params(params)
params[Params.DB_HOST.value] = self.get_db_host()
+
+ # make that spark master don't need to support glue
+ if not self.is_support_glue:
+ # note: this param must be the type of `str`
+ params[Params.SUPPORT_GLUE.value] = 'false'
+
resp = self.create_stack(
stack_name=spark_master_stack_name,
file_path=self.path_of_spark_master_stack,
@@ -956,6 +985,11 @@ class AWSInstance:
params = self.update_basic_params(params)
params[Params.SPARK_MASTER_HOST.value] =
self.get_spark_master_host_by_name(spark_master_stack)
+ # make that spark master don't need to support glue
+ if not self.is_support_glue:
+ # note: this param must be the type of `str`
+ params[Params.SUPPORT_GLUE.value] = 'false'
+
resp = self.create_stack(
stack_name=spark_worker_stack,
file_path=self.path_of_spark_slave_stack,
@@ -1122,6 +1156,11 @@ class AWSInstance:
params[Params.SPARK_MASTER_HOST.value] =
self.get_spark_master_host_by_name(spark_master_stack)
params[Params.SPARK_WORKER_NUM.value] = str(worker_num)
+ # make that spark master don't need to support glue
+ if not self.is_support_glue:
+ # note: this param must be the type of `str`
+ params[Params.SUPPORT_GLUE.value] = 'false'
+
resp = self.create_stack(
stack_name=stack_name,
file_path=self.path_of_spark_slave_scaled_stack,
@@ -1748,17 +1787,39 @@ class AWSInstance:
return handled_outputs
def alive_nodes(self, cluster_nums: List = None) -> None:
+ msgs = self._get_live_nodes_messages(cluster_nums=cluster_nums)
+ logger.info(f"Fetching messages successfully ...")
+
+ header_msg = '\n=================== List Alive Nodes
===========================\n'
+ result = header_msg + f"Stack Name\t\tInstance ID\t\tPrivate
Ip\t\tPublic Ip\t\t\n"
+ for msg in msgs:
+ result += msg + '\n'
+ result += header_msg
+ logger.info(result)
+
+ def _get_live_nodes_messages(self, cluster_nums: List) -> List:
# default cluster
+ logger.info("Fetching static service node messages ...")
static_msg = self.get_static_services_basic_msg()
+
+ logger.info(f"Fetching kylin node messages ...")
kylin_msg = self.get_kylin_basic_msg()
+
+ logger.info(f"Fetching spark master node messages ...")
spark_master_msg = self.get_spark_master_msg()
msgs = [m for m in [static_msg, kylin_msg, spark_master_msg] if m]
+ logger.info(f"Fetching spark worker nodes messages ...")
spark_slaves_msg = self.get_spark_slaves_basic_msg()
+
+ logger.info(f"Fetching Zookeeper nodes messages ...")
zks_msg = self.get_zks_basic_msg()
+ logger.info(f"Fetching scaled kylin nodes messages ...")
scaled_kylins_msg = self.get_scaled_kylin_basic_msg()
+
+ logger.info(f"Fetching scaled spark worker nodes messages ...")
scaled_spark_workers_msg = self.get_scaled_spark_workers_basic_msg()
msgs.extend(zks_msg)
@@ -1770,10 +1831,14 @@ class AWSInstance:
if cluster_nums is None:
cluster_nums = []
for num in cluster_nums:
+ logger.info(f"Fetching zookeepers nodes of cluster {num} messages
...")
zks_msg_of_target_cluster = self.get_zks_of_target_cluster_msg(num)
msgs.extend(zks_msg_of_target_cluster)
+ logger.info(f"Fetching spark master nodes of cluster {num}
messages ...")
spark_master_msg_of_target_cluster =
self.get_spark_master_of_target_cluster_msg(num)
+
+ logger.info(f"Fetching kylin nodes of cluster {num} messages ...")
kylin_msg_of_target_cluster =
self.get_kylin_basic_msg_of_target_cluster(num)
for msg in [spark_master_msg_of_target_cluster,
kylin_msg_of_target_cluster]:
@@ -1781,20 +1846,19 @@ class AWSInstance:
continue
msgs.append(msg)
+ logger.info(f"Fetching spark worker nodes of cluster {num}
messages ...")
spark_workers_msgs_of_target_cluster =
self.get_spark_workers_of_target_cluster_msg(num)
msgs.extend(spark_workers_msgs_of_target_cluster)
+ logger.info(f"Fetching scaled kylin nodes of cluster {num}
messages ...")
scaled_kylins_msg_of_target_cluster =
self.get_scaled_kylin_basic_msg_of_target_cluster(num)
+
+ logger.info(f"Fetching scaled spark worker nodes of cluster {num}
messages ...")
scaled_workers_msg_of_target_cluster =
self.get_scaled_spark_workers_basic_msg_of_target_cluster(num)
msgs.extend(scaled_kylins_msg_of_target_cluster)
msgs.extend(scaled_workers_msg_of_target_cluster)
- header_msg = '\n=================== List Alive Nodes
===========================\n'
- result = header_msg + f"Stack Name\t\tInstance ID\t\tPrivate
Ip\t\tPublic Ip\t\t\n"
- for msg in msgs:
- result += msg + '\n'
- result += header_msg
- logger.info(result)
+ return msgs
def is_ec2_stacks_ready(self) -> bool:
if not (
@@ -2045,6 +2109,13 @@ class AWSInstance:
continue
self.upload_file_to_s3(SCRIPTS_PATH, script, self.bucket,
self.bucket_scripts_dir)
+ def upload_demos_to_s3(self, demos: List) -> None:
+ for demo in demos:
+ if self.is_object_exists_on_s3(demo, self.bucket,
self.bucket_demos_dir):
+ logger.info(f'{demo} already exists, skip upload it.')
+ continue
+ self.upload_file_to_s3(DEMOS_PATH, demo, self.bucket,
self.bucket_demos_dir)
+
def upload_kylin_properties_to_s3(self, cluster_num: int = None,
properties_file: str = 'kylin.properties') -> None:
# Always to upload local kylin.properties
local_dir =
KYLIN_PROPERTIES_TEMPLATE_DIR.format(cluster_num=cluster_num if cluster_num
else 'default')
diff --git a/kylin_configs.yaml b/kylin_configs.yaml
index d924459..fcf2020 100644
--- a/kylin_configs.yaml
+++ b/kylin_configs.yaml
@@ -54,7 +54,9 @@ CIDR_IP: ${Cidr Ip}
# If you set `SUPPORT_GLUE` to be `true`, then please make sure that you
using Kylin only support `Job` mode, not `Query` as same as `All`.
# Because `All` will support `Query`. So you need to change the
`kylin.server.mode` in `kylin.properties` to be `Job` or `All`.
# If you set `All` mode for Kylin, there will be a error when you query any
sql.
-SUPPORT_GLUE: &SUPPORT_GLUE 'false'
+# This parameter will be set automatically by `kylin_mode`. Such as if
`kylin_mode` is 'job', `SUPPORT_GLUE` will be 'true', and other mode(`query` or
`all`) will be 'false'.
+# So user don't need to care about this parameters.
+SUPPORT_GLUE: &SUPPORT_GLUE 'true'
# Enable using MDX
ENABLE_MDX: &ENABLE_MDX 'false'
diff --git a/utils.py b/utils.py
index 568e82d..833d4de 100644
--- a/utils.py
+++ b/utils.py
@@ -32,7 +32,10 @@ from constant.path import (
RENDERED_FILE,
PROPERTIES_TEMPLATE_DIR,
TEMPLATE_OF_KYLIN_PROPERTIES,
+ KYLIN_PROPERTIES_TEMPLATES_DIR,
+ DEMOS_PATH,
)
+from constant.server_mode import ServerMode
logger = logging.getLogger(__name__)
@@ -91,6 +94,13 @@ class Utils:
Utils.is_downloaded_success(filename=filename, dest_folder=JARS_PATH)
@staticmethod
+ def download_demo(filename: str) -> str:
+ base_url = Utils.DOWNLOAD_BASE_URL + '/kylin_demo/'
+ url = base_url + filename
+ Utils.download(url=url, dest_folder=DEMOS_PATH, filename=filename)
+ Utils.is_downloaded_success(filename=filename, dest_folder=DEMOS_PATH)
+
+ @staticmethod
def download(url: str, dest_folder: str, filename: str) -> None:
if not Utils.is_file_exists(dest_folder):
# create folder if it does not exist
@@ -139,20 +149,26 @@ class Utils:
yield f
@staticmethod
- def render_properties(params: Dict, cluster_num: int = None,
properties_template: str = 'kylin.properties') -> None:
- search_path =
KYLIN_PROPERTIES_TEMPLATE_DIR.format(cluster_num=cluster_num if cluster_num
else 'default')
+ def render_properties(params: Dict,
+ cluster_num: int = None,
+ properties_file: str = 'kylin.properties',
+ kylin_mode: str = 'all') -> None:
+ target_path =
KYLIN_PROPERTIES_TEMPLATE_DIR.format(cluster_num=cluster_num if cluster_num
else 'default')
- dest_path = os.path.join(search_path, 'kylin.properties')
- rendered_file = os.path.join(search_path, RENDERED_FILE)
+ # replace a matched kylin.properties for a kylin in cluster
+ Utils.replace_property_for_kylin(target_path, kylin_mode)
+
+ dest_path = os.path.join(target_path, 'kylin.properties')
+ rendered_file = os.path.join(target_path, RENDERED_FILE)
if Utils.is_file_exists(rendered_file):
logger.info(f'{dest_path} already rendered. Skip render it again.')
return
- env = Environment(loader=FileSystemLoader(searchpath=search_path))
+ env = Environment(loader=FileSystemLoader(searchpath=target_path))
try:
- template = env.get_template(properties_template)
+ template = env.get_template(properties_file)
except TemplateNotFound:
- raise Exception(f'Properties template: {properties_template} not
in the path: {search_path}.\n '
+ raise Exception(f'Properties template: {properties_file} not in
the path: {target_path}.\n '
f'Please copy the needed kylin.properties template
in `backup/properties/templates` '
f'to `backup/properties/{cluster_num}`\n. If
`backup/properties/{cluster_num}` not exists, '
f'please make it and rename the template file to
`kylin.properties` in this dir.')
@@ -166,9 +182,19 @@ class Utils:
logger.info(f'Current {dest_path} rendered.')
@staticmethod
- def refresh_kylin_properties(properties_template: str =
'kylin.properties') -> None:
+ def replace_property_for_kylin(target_path: str, kylin_mode: str = 'all',
properties_file: str = 'kylin.properties'):
+ # default kylin.properties is for 'all' mode to a kylin node
+ if kylin_mode == ServerMode.ALL.value:
+ return
+
+ source_file = os.path.join(KYLIN_PROPERTIES_TEMPLATES_DIR, kylin_mode,
properties_file)
+ destination_file = os.path.join(target_path, properties_file)
+ shutil.copy(source_file, destination_file)
+
+ @staticmethod
+ def refresh_kylin_properties(properties_file: str = 'kylin.properties') ->
None:
Utils.refresh_kylin_properties_in_clusters()
-
Utils.refresh_kylin_properties_in_default(properties_template=properties_template)
+
Utils.refresh_kylin_properties_in_default(properties_template=properties_file)
@staticmethod
def refresh_kylin_properties_in_clusters(cluster_nums: List[int] = None)
-> None:
@@ -220,9 +246,11 @@ class Utils:
if filename not in Utils.FILES_SIZE_IN_BYTES.keys():
logger.warning(f'Current file {filename} is not the matched
version, skip check file size.')
return
- assert Utils.FILES_SIZE_IN_BYTES[filename] ==
Utils.size_in_bytes(dest_folder=dest_folder, filename=filename), \
- f'{filename} size should be {Utils.FILES_SIZE_IN_BYTES[filename]}
bytes' \
- f'not {Utils.size_in_bytes(dest_folder, filename)} bytes, please
check.'
+ expected_size = Utils.FILES_SIZE_IN_BYTES[filename]
+ real_size = Utils.size_in_bytes(dest_folder=dest_folder,
filename=filename)
+
+ assert expected_size == real_size, \
+ f'{filename} size should be {expected_size} bytes not {real_size}
bytes, please check.'
logger.info(f'Downloaded file {filename} successfully.')
@staticmethod