This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch kylin4_on_cloud in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/kylin4_on_cloud by this push: new 8a93937 # adapt initialized config for default aws account limit 8a93937 is described below commit 8a93937dd800de2dd98caf8b75a2a876bea33920 Author: Mukvin <boyboys...@163.com> AuthorDate: Mon Feb 7 19:49:11 2022 +0800 # adapt initialized config for default aws account limit --- .gitignore | 4 +- bin/init.sh | 20 +++-- .../ec2-cluster-kylin4-template.yaml | 4 +- cloudformation_templates/ec2-cluster-kylin4.yaml | 4 +- .../ec2-cluster-spark-master.yaml | 4 +- .../ec2-cluster-spark-slave-template.yaml | 13 +++- .../ec2-cluster-spark-slave.yaml | 4 +- .../ec2-cluster-static-services.yaml | 4 +- cloudformation_templates/ec2-cluster-zk.yaml | 4 +- clouds/aws.py | 14 +++- engine.py | 2 +- engine_utils.py | 10 +-- instances/aws_instance.py | 39 ++++++---- kylin_configs.yaml | 7 +- utils.py | 85 +++++++++++++++++----- 15 files changed, 153 insertions(+), 65 deletions(-) diff --git a/.gitignore b/.gitignore index 87514ad..dbc5998 100644 --- a/.gitignore +++ b/.gitignore @@ -6,4 +6,6 @@ __pycache__/ */.DS_Store /venv/ .idea -logs/*.log \ No newline at end of file +logs/*.log +backup/jars +backup/tars \ No newline at end of file diff --git a/bin/init.sh b/bin/init.sh index 5eb5c68..619d0ba 100755 --- a/bin/init.sh +++ b/bin/init.sh @@ -61,17 +61,27 @@ function check_python_version { function install_env { check_python_version - if [[ ! -d $TOOL_HOME/venv ]]; then + if [[ ! -f $TOOL_HOME/venv/.installed ]]; then + # If not installed `venv` successfully, but `venv` already exists, remove it first. + if [[ -d $TOOL_HOME/venv ]]; then + logging warn "Deleting remnant useless venv ..." + rm -rf $TOOL_HOME/venv + logging warn "Deleted remnant useless venv successfully." + fi + logging info "Initializing venv ..." python3 -m venv $TOOL_HOME/venv source $TOOL_HOME/venv/bin/activate - logging info "Install dependencies ..." + logging info "Installing dependencies ..." pip3 install -r $TOOL_HOME/requirements.txt + logging info "Installed dependencies successfully." + touch $TOOL_HOME/venv/.installed + logging info "Init $TOOL_HOME/.venv successfully." else logging warn "$TOOL_HOME/.venv already existing, skip install again." fi - logging info "Please use 'source venv/bin/activate' to activate venv and execute commands." - logging info "Please use 'python deploy.py --help' to check the usage." - logging info "Enjoy it and have fun." + logging warn "Please use 'source venv/bin/activate' to activate venv and execute commands." + logging warn "Please use 'python deploy.py --help' to check the usage." + logging info "Please enjoy it and have fun." } function main { diff --git a/cloudformation_templates/ec2-cluster-kylin4-template.yaml b/cloudformation_templates/ec2-cluster-kylin4-template.yaml index 9ccd13b..5278ab3 100644 --- a/cloudformation_templates/ec2-cluster-kylin4-template.yaml +++ b/cloudformation_templates/ec2-cluster-kylin4-template.yaml @@ -103,9 +103,9 @@ Parameters: Description: EC2 instance type Type: String ConstraintDescription: must be a valid EC2 instance type. - Default: m5.2xlarge + Default: m5.xlarge AllowedValues: - - m5.2xlarge + - m5.xlarge Ec2VolumnTypeForKylin4Node: Type: String Default: gp2 diff --git a/cloudformation_templates/ec2-cluster-kylin4.yaml b/cloudformation_templates/ec2-cluster-kylin4.yaml index 9d909dd..a186f45 100644 --- a/cloudformation_templates/ec2-cluster-kylin4.yaml +++ b/cloudformation_templates/ec2-cluster-kylin4.yaml @@ -102,9 +102,9 @@ Parameters: Description: EC2 instance type Type: String ConstraintDescription: must be a valid EC2 instance type. - Default: m5.2xlarge + Default: m5.xlarge AllowedValues: - - m5.2xlarge + - m5.xlarge Ec2VolumnTypeForKylin4Node: Type: String Default: gp2 diff --git a/cloudformation_templates/ec2-cluster-spark-master.yaml b/cloudformation_templates/ec2-cluster-spark-master.yaml index 66320d1..418ef12 100644 --- a/cloudformation_templates/ec2-cluster-spark-master.yaml +++ b/cloudformation_templates/ec2-cluster-spark-master.yaml @@ -84,9 +84,9 @@ Parameters: Description: EC2 instance type Type: String ConstraintDescription: must be a valid EC2 instance type. - Default: m5.2xlarge + Default: m5.xlarge AllowedValues: - - m5.2xlarge + - m5.xlarge Ec2VolumnTypeForSparkMasterNode: Type: String Default: gp2 diff --git a/cloudformation_templates/ec2-cluster-spark-slave-template.yaml b/cloudformation_templates/ec2-cluster-spark-slave-template.yaml index 94c6d4c..0faa939 100644 --- a/cloudformation_templates/ec2-cluster-spark-slave-template.yaml +++ b/cloudformation_templates/ec2-cluster-spark-slave-template.yaml @@ -79,6 +79,13 @@ Parameters: - m5.xlarge - m5.2xlarge - m5.4xlarge + InstanceTypeForTest: + Description: EC2 instance type + Type: String + ConstraintDescription: Must be a valid EC2 instance type. + Default: m5.xlarge + AllowedValues: + - m5.xlarge Ec2VolumnTypeForSlaveNode: Type: String Default: gp2 @@ -175,7 +182,11 @@ Resources: - - Slave Scale - !Ref WorkerNum - Node for kylin4 - InstanceType: !Ref InstanceType + InstanceType: + !If + - IsProductMode + - !Ref InstanceType + - !Ref InstanceTypeForTest IamInstanceProfile: !Ref InstanceProfileId NetworkInterfaces: - DeviceIndex: 0 diff --git a/cloudformation_templates/ec2-cluster-spark-slave.yaml b/cloudformation_templates/ec2-cluster-spark-slave.yaml index b0f2ef7..2fed70a 100644 --- a/cloudformation_templates/ec2-cluster-spark-slave.yaml +++ b/cloudformation_templates/ec2-cluster-spark-slave.yaml @@ -81,9 +81,9 @@ Parameters: Description: EC2 instance type Type: String ConstraintDescription: must be a valid EC2 instance type. - Default: m5.2xlarge + Default: m5.xlarge AllowedValues: - - m5.2xlarge + - m5.xlarge Ec2VolumnTypeForSlaveNode: Type: String Default: gp2 diff --git a/cloudformation_templates/ec2-cluster-static-services.yaml b/cloudformation_templates/ec2-cluster-static-services.yaml index d75baf8..9c214dd 100644 --- a/cloudformation_templates/ec2-cluster-static-services.yaml +++ b/cloudformation_templates/ec2-cluster-static-services.yaml @@ -77,9 +77,9 @@ Parameters: Description: EC2 instance type Type: String ConstraintDescription: must be a valid EC2 instance type. - Default: m5.xlarge + Default: m5.large AllowedValues: - - m5.xlarge + - m5.large Ec2VolumeSizeForStaticServicesNode: Type: Number Default: 20 diff --git a/cloudformation_templates/ec2-cluster-zk.yaml b/cloudformation_templates/ec2-cluster-zk.yaml index f8bd47a..fbad62f 100644 --- a/cloudformation_templates/ec2-cluster-zk.yaml +++ b/cloudformation_templates/ec2-cluster-zk.yaml @@ -63,9 +63,9 @@ Parameters: Description: EC2 instance type Type: String ConstraintDescription: must be a valid EC2 instance type. - Default: m5.xlarge + Default: m5.large AllowedValues: - - m5.xlarge + - m5.large Ec2VolumeSizeForZookeeperNode: Type: Number Default: 10 diff --git a/clouds/aws.py b/clouds/aws.py index d661851..a0dfaa5 100644 --- a/clouds/aws.py +++ b/clouds/aws.py @@ -80,6 +80,18 @@ class AWS: return self.cloud_instance.kylin_stack_name @property + def static_service_stack_name(self) -> str: + return self.cloud_instance.static_service_stack_name + + @property + def rds_stack_name(self) -> str: + return self.cloud_instance.rds_stack_name + + @property + def vpc_stack_name(self) -> str: + return self.cloud_instance.vpc_stack_name + + @property def is_associated_public_ip(self) -> bool: return self.config[Params.ASSOSICATED_PUBLIC_IP.value] == 'true' @@ -152,7 +164,7 @@ class AWS: self.cloud_instance.terminate_zk_stack() logger.info('Cluster terminated useless nodes.') - def destroy_rds_and_vpc(self) -> None: + def destroy_monitor_and_rds_and_vpc(self) -> None: if not self.is_destroy_all: return logger.info('Prepare to destroy RDS and VPC and monitor node.') diff --git a/engine.py b/engine.py index a8e35d3..ee2f6c0 100644 --- a/engine.py +++ b/engine.py @@ -52,7 +52,7 @@ class Engine: logger.info('Destroy default Kylin Cluster successfully.') def destroy_rds_and_vpc(self) -> None: - self.engine_utils.destroy_rds_and_vpc() + self.engine_utils.destroy_monitor_and_rds_and_vpc() def list_alive_nodes(self) -> None: logger.info('Ec2: list alive nodes.') diff --git a/engine_utils.py b/engine_utils.py index 2662b5c..97a7099 100644 --- a/engine_utils.py +++ b/engine_utils.py @@ -170,8 +170,8 @@ class EngineUtils: self.aws.destroy_clusters(cluster_nums=[cluster_num]) self.aws.restart_prometheus_server() - def destroy_rds_and_vpc(self) -> None: - self.aws.destroy_rds_and_vpc() + def destroy_monitor_and_rds_and_vpc(self) -> None: + self.aws.destroy_monitor_and_rds_and_vpc() def is_cluster_ready(self) -> bool: if self.cloud_address: @@ -202,12 +202,6 @@ class EngineUtils: jars = self.needed_jars() for jar in jars: Utils.download_jar(jar) - if self.config[Config.ENABLE_SOFT_AFFINITY.value] == 'true': - assert Utils.files_in_jars() == 4, f"Needed jars must be 4, not {Utils.files_in_jars()}, " \ - f"which contains {jars}." - else: - assert Utils.files_in_jars() == 2, f"Needed jars must be 2, not {Utils.files_in_jars()}, " \ - f"which contains {jars}." def upload_needed_files(self) -> None: logger.info("Start to uploading tars.") diff --git a/instances/aws_instance.py b/instances/aws_instance.py index 53ff742..291bfc2 100644 --- a/instances/aws_instance.py +++ b/instances/aws_instance.py @@ -394,8 +394,7 @@ class AWSInstance: try: self.rds_client.describe_db_instances(DBInstanceIdentifier=self.db_identifier) except self.rds_client.exceptions.DBInstanceNotFoundFault as ex: - logger.warning(ex.response['Error']['Message']) - logger.info(f'Now creating {self.db_identifier}.') + logger.warning(f'DB {self.db_identifier} is not found.') return False return True @@ -409,14 +408,12 @@ class AWSInstance: # update needed params params[Params.SUBNET_GROUP_NAME.value] = self.get_subnet_group() params[Params.SECURITY_GROUP.value] = self.get_security_group_id() - Params[Params.ZONE.value] = self.region + 'b' + params[Params.ZONE.value] = self.region + 'b' resp = self.create_stack( stack_name=self.rds_stack_name, file_path=self.path_of_rds_stack, params=params, ) - # Make sure that rds create successfully. - assert self.is_stack_complete(self.rds_stack_name), f'Rds {self.db_identifier} create failed, please check.' return resp def terminate_rds_stack(self) -> Optional[Dict]: @@ -528,7 +525,6 @@ class AWSInstance: file_path=self.path_of_zk_stack, params=params ) - assert self.is_stack_complete(zk_stack_name), f'{zk_stack_name} create failed, please check.' return resp def terminate_zk_stack(self) -> Optional[Dict]: @@ -677,7 +673,6 @@ class AWSInstance: file_path=self.path_of_kylin_stack, params=params ) - assert self.is_stack_complete(kylin_stack_name), f"Create scaled kylin stack not complete, please check." return resp def terminate_kylin_stack(self) -> Optional[Dict]: @@ -811,7 +806,6 @@ class AWSInstance: file_path=self.path_of_kylin_scale_stack, params=params ) - assert self.is_stack_complete(stack_name) return resp def scale_down_kylin(self, kylin_num: int, cluster_num: int = None) -> Optional[Dict]: @@ -889,9 +883,6 @@ class AWSInstance: file_path=self.path_of_spark_master_stack, params=params ) - - assert self.is_stack_complete(spark_master_stack_name), \ - f'Current {spark_master_stack_name} stack not create complete, please check.' return resp def terminate_spark_master_stack(self) -> Optional[Dict]: @@ -1135,7 +1126,6 @@ class AWSInstance: file_path=self.path_of_spark_slave_scaled_stack, params=params ) - assert self.is_stack_complete(stack_name) return resp def scale_down_worker(self, worker_num: int, cluster_num: int = None) -> Optional[Dict]: @@ -1453,6 +1443,7 @@ class AWSInstance: return resp def create_stack(self, stack_name: str, file_path: str, params: Dict, is_capability: bool = False) -> Dict: + logger.info(f'Now creating stack: {stack_name}.') try: if is_capability: resp = self.cf_client.create_stack( @@ -1470,8 +1461,8 @@ class AWSInstance: return resp except ParamValidationError as ex: logger.error(ex) - assert self.is_stack_complete(stack_name=stack_name), \ - f"Stack {stack_name} not create complete, please check." + assert self.is_stack_complete(stack_name=stack_name), f"Stack: {stack_name} not create complete, please check." + logger.info(f'Create stack: {stack_name} complete.') def delete_stack(self, stack_name: str) -> Dict: logger.info(f'Current terminating stack: {stack_name}.') @@ -2006,10 +1997,14 @@ class AWSInstance: def is_object_exists_on_s3(self, filename: str, bucket: str, bucket_dir: str) -> bool: try: - self.s3_client.head_object(Bucket=bucket, Key=bucket_dir + filename) + response = self.s3_client.head_object(Bucket=bucket, Key=bucket_dir + filename) + Utils.is_uploaded_success(filename=filename, size_in_bytes=response['ContentLength']) except botocore.exceptions.ClientError as ex: assert ex.response['Error']['Code'] == '404' return False + except AssertionError as ex: + logger.error(ex) + return False return True def is_s3_directory_exists(self, bucket: str, bucket_dir: str) -> bool: @@ -2085,9 +2080,23 @@ class AWSInstance: def is_stack_delete_complete(self, stack_name: str) -> bool: return self._stack_status_check(name_or_id=stack_name, status='DELETE_COMPLETE') + def is_stack_rollback_complete(self, stack_name: str) -> bool: + return self._stack_status_check(name_or_id=stack_name, status='ROLLBACK_COMPLETE') + + def is_stack_rollback_failed(self, stack_name: str) -> bool: + return self._stack_status_check(name_or_id=stack_name, status='ROLLBACK_FAILED') + + def is_stack_rollback_in_progress(self, stack_name: str) -> bool: + return self._stack_status_check(name_or_id=stack_name, status='ROLLBACK_IN_PROGRESS') + def is_stack_complete(self, stack_name: str) -> bool: if self._stack_complete(stack_name): return True + # before return false, check stack whether is created failed or other failed status. + if self.is_stack_rollback_failed(stack_name=stack_name) \ + or self.is_stack_rollback_complete(stack_name=stack_name) \ + or self.is_stack_rollback_in_progress(stack_name=stack_name): + raise Exception(f'Current stack: {stack_name} is create failed, please check.') return False def _validate_spark_worker_scale(self, stack_name: str) -> None: diff --git a/kylin_configs.yaml b/kylin_configs.yaml index adf2dbe..161cf76 100644 --- a/kylin_configs.yaml +++ b/kylin_configs.yaml @@ -156,7 +156,6 @@ EC2_RDS_PARAMS: Ec2Mode: test RDSEngine: mysql RDSEngineVersion: 5.7.25 - Zone: cn-northwest-1b EC2_STATIC_SERVICES_PARAMS: # Note: params details check in related yaml file @@ -172,10 +171,10 @@ EC2_STATIC_SERVICES_PARAMS: DbPassword: *DbPassword StaticServicesScriptFileName: prepare-ec2-env-for-static-services.sh - Ec2Mode: product + Ec2Mode: test # followed params is invalid if Ec2Mode(which set in the yaml) is 'test' - Ec2InstanceTypeForStaticServices: m5.4xlarge - Ec2VolumeSizeForStaticServicesNode: '50' + Ec2InstanceTypeForStaticServices: m5.2xlarge + Ec2VolumeSizeForStaticServicesNode: '20' Ec2VolumnTypeForStaticServicesNode: standard EC2_ZOOKEEPERS_PARAMS: diff --git a/utils.py b/utils.py index fbe38ca..cbfff62 100644 --- a/utils.py +++ b/utils.py @@ -27,7 +27,10 @@ from constant.path import ( TARS_PATH, JARS_PATH, TEMPLATES_PATH, - KYLIN_PROPERTIES_TEMPLATE_DIR, KYLIN_PROPERTIES_DIR, RENDERED_FILE, PROPERTIES_TEMPLATE_DIR, + KYLIN_PROPERTIES_TEMPLATE_DIR, + KYLIN_PROPERTIES_DIR, + RENDERED_FILE, + PROPERTIES_TEMPLATE_DIR, TEMPLATE_OF_KYLIN_PROPERTIES, ) @@ -37,6 +40,19 @@ logger = logging.getLogger(__name__) class Utils: DOWNLOAD_BASE_URL = 'https://s3.cn-north-1.amazonaws.com.cn/public.kyligence.io/kylin' + FILES_SIZE_IN_BYTES = { + 'jdk-8u301-linux-x64.tar.gz': 145520298, + 'apache-kylin-4.0.0-bin-spark3.tar.gz': 198037626, + 'apache-hive-2.3.9-bin.tar.gz': 286170958, + 'hadoop-3.2.0.tar.gz': 345625475, + 'node_exporter-1.3.1.linux-amd64.tar.gz': 9033415, + 'prometheus-2.31.1.linux-amd64.tar.gz': 73079452, + 'spark-3.1.1-bin-hadoop3.2.tgz': 228721937, + 'zookeeper-3.4.13.tar.gz': 37191810, + 'commons-configuration-1.3.jar': 232915, + 'mysql-connector-java-5.1.40.jar': 990924, + } + @staticmethod def generate_nodes(scale_nodes: Tuple) -> List: if not scale_nodes: @@ -61,45 +77,49 @@ class Utils: base_url = Utils.DOWNLOAD_BASE_URL + '/tar/' url = base_url + filename Utils.download(url=url, dest_folder=TARS_PATH, filename=filename) + Utils.is_downloaded_success(filename=filename, dest_folder=TARS_PATH) @staticmethod def download_jar(filename: str) -> None: base_url = Utils.DOWNLOAD_BASE_URL + '/jars/' url = base_url + filename Utils.download(url=url, dest_folder=JARS_PATH, filename=filename) + Utils.is_downloaded_success(filename=filename, dest_folder=JARS_PATH) @staticmethod def download(url: str, dest_folder: str, filename: str) -> None: - if not os.path.exists(dest_folder): + if not Utils.is_file_exists(dest_folder): # create folder if it does not exist os.makedirs(dest_folder) file_path = os.path.join(dest_folder, filename) - if os.path.exists(file_path): + if Utils.is_file_exists(file_path): logger.info(f'{filename} already exists, skip download it.') return r = requests.get(url, stream=True) - if r.ok: - logger.info(f"saving to {os.path.abspath(file_path)}.") - with open(file_path, 'wb') as f: - for chunk in r.iter_content(chunk_size=1024 * 8): - if chunk: - f.write(chunk) - f.flush() - os.fsync(f.fileno()) - else: # HTTP status code 4XX/5XX + if not r.ok: + # HTTP status code 4XX/5XX logger.error("Download failed: status code {}\n{}".format(r.status_code, r.text)) + return + logger.info(f"Downloading {os.path.abspath(file_path)}.") + with open(file_path, 'wb') as f: + for chunk in r.iter_content(chunk_size=1024 * 8): + if not chunk: + break + f.write(chunk) + f.flush() + os.fsync(f.fileno()) @staticmethod def files_in_tar() -> int: - if not os.path.exists(TARS_PATH): + if not Utils.is_file_exists(TARS_PATH): logger.error(f'{TARS_PATH} does exists, please check.') return 0 return sum(1 for _ in Utils.listdir_nohidden(TARS_PATH)) @staticmethod def files_in_jars() -> int: - if not os.path.exists(JARS_PATH): + if not Utils.is_file_exists(JARS_PATH): logger.error(f'{JARS_PATH} does exists, please check.') return 0 return sum(1 for _ in Utils.listdir_nohidden(JARS_PATH)) @@ -120,7 +140,7 @@ class Utils: dest_path = os.path.join(search_path, 'kylin.properties') rendered_file = os.path.join(search_path, RENDERED_FILE) - if os.path.exists(rendered_file): + if Utils.is_file_exists(rendered_file): logger.info(f'{dest_path} already rendered. Skip render it again.') return @@ -170,13 +190,13 @@ class Utils: # refresh default kylin.properties default_path = KYLIN_PROPERTIES_TEMPLATE_DIR.format(cluster_num='default') mark_file_path = os.path.join(default_path, RENDERED_FILE) - if os.path.exists(mark_file_path): + if Utils.is_file_exists(mark_file_path): logger.info(f'Removing the render file.') os.remove(mark_file_path) logger.info(f'Removed the render file.') kylin_properties = os.path.join(default_path, properties_template) - if os.path.exists(kylin_properties): + if Utils.is_file_exists(kylin_properties): logger.info(f'Removing the render file.') os.remove(kylin_properties) logger.info(f'Removed the render file.') @@ -186,3 +206,34 @@ class Utils: logger.info(f'Copy template from {template} to {kylin_properties}.') shutil.copy(template, kylin_properties) logger.info(f'Copy done.') + + @staticmethod + def is_file_exists(file_with_full_path: str) -> bool: + return os.path.exists(file_with_full_path) + + @staticmethod + def is_downloaded_success(filename: str, dest_folder: str) -> None: + if filename not in Utils.FILES_SIZE_IN_BYTES.keys(): + logger.warning(f'Current file {filename} is not the matched version, skip check file size.') + return + assert Utils.FILES_SIZE_IN_BYTES[filename] == Utils.size_in_bytes(dest_folder=dest_folder, filename=filename), \ + f'{filename} size should be {Utils.FILES_SIZE_IN_BYTES[filename]} bytes' \ + f'not {Utils.size_in_bytes(dest_folder, filename)} bytes, please check.' + logger.info(f'Downloaded file {filename} successfully.') + + @staticmethod + def size_in_bytes(filename: str, dest_folder: str) -> int: + # return file size in bytes + return os.path.getsize(os.path.join(dest_folder,filename)) + + @staticmethod + def is_uploaded_success(filename: str, size_in_bytes: int) -> None: + if filename.endswith('.sh'): + # .sh scripts don't need to check file size, because it is in local + return + if filename not in Utils.FILES_SIZE_IN_BYTES.keys(): + logger.warning(f'Current uploading file: {filename} is not match version, skip check file size.') + return + assert Utils.FILES_SIZE_IN_BYTES[filename] == size_in_bytes, \ + f'Uploaded file {filename} size should be {Utils.FILES_SIZE_IN_BYTES[filename]} bytes, ' \ + f'not {size_in_bytes} bytes, please check.'