This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch kylin4_on_cloud in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/kylin4_on_cloud by this push: new 3f82213 # minor fix, add progress bar for logging detail 3f82213 is described below commit 3f822137b08c5d875bb6c50073b117e945ab7748 Author: Mukvin <boyboys...@163.com> AuthorDate: Sat Apr 2 14:53:53 2022 +0800 # minor fix, add progress bar for logging detail --- clouds/aws.py | 2 +- engine.py | 6 +---- instances/aws_instance.py | 25 ++++++++++++++++--- utils.py => utils/common_utils.py | 13 ++++++++++ engine_utils.py => utils/engine_utils.py | 2 +- utils/progress_bar.py | 43 ++++++++++++++++++++++++++++++++ 6 files changed, 81 insertions(+), 10 deletions(-) diff --git a/clouds/aws.py b/clouds/aws.py index 8b944ea..b082c1a 100644 --- a/clouds/aws.py +++ b/clouds/aws.py @@ -28,7 +28,7 @@ from constant.kylin_properties_params import KylinProperties from constant.path import KYLIN_PROPERTIES_TEMPLATE_DIR from constant.yaml_params import Params from instances.aws_instance import AWSInstance -from utils import Utils +from utils.common_utils import Utils logger = logging.getLogger(__name__) diff --git a/engine.py b/engine.py index f43d438..1b717ad 100644 --- a/engine.py +++ b/engine.py @@ -16,16 +16,12 @@ # import logging -import os from typing import List -import yaml - from constant.client import Client from constant.config import Config from constant.deployment import ScaleType, NodeType, Cluster -from constant.yaml_files import File -from engine_utils import EngineUtils +from utils.engine_utils import EngineUtils logger = logging.getLogger(__name__) diff --git a/instances/aws_instance.py b/instances/aws_instance.py index 002cc8e..f370d95 100644 --- a/instances/aws_instance.py +++ b/instances/aws_instance.py @@ -34,7 +34,8 @@ from constant.path import JARS_PATH, TARS_PATH, SCRIPTS_PATH, KYLIN_PROPERTIES_T from constant.server_mode import ServerMode from constant.yaml_files import File from constant.yaml_params import Params -from utils import Utils +from utils.common_utils import Utils +from utils.progress_bar import ProgressBar logger = logging.getLogger(__name__) @@ -563,6 +564,7 @@ class AWSInstance: # FIXME: it's hard code to make sure that zks were already initialized. time.sleep(10) + logger.info(f"Current execute commands in `Zookeeper stack` which named {zk_stack}.") self.refresh_zks_cfg(zk_ips=zk_ips, zk_ids=zk_ids) self.start_zks(zk_ids=zk_ids, zk_ips=zk_ips) @@ -927,6 +929,7 @@ class AWSInstance: raise Exception(msg) start_command = Commands.START_SPARK_MASTER_COMMAND.value + logger.info(f"Current execute commands in `Spark master stack` which named {self.spark_master_stack_name}.") self.exec_script_instance_and_return(name_or_id=spark_master_id, script=start_command) def get_spark_master_instance_id(self) -> str: @@ -1183,6 +1186,7 @@ class AWSInstance: instance_id = self.get_instance_id(stack_name) # spark decommission feature start to be supported in spark 3.1.x. # refer: https://issues.apache.org/jira/browse/SPARK-20624. + logger.info(f"Current execute commands in `Spark Slave stack` which named {stack_name}.") try: self.exec_script_instance_and_return( name_or_id=instance_id, script=Commands.SPARK_DECOMMISION_WORKER_COMMAND.value) @@ -1206,11 +1210,13 @@ class AWSInstance: def start_prometheus_server(self) -> None: start_command = Commands.START_PROMETHEUS_COMMAND.value instance_id = self.instance_id_of_static_services + logger.info(f"Current execute commands in `static service stack` which named {self.static_service_stack_name}.") self.exec_script_instance_and_return(name_or_id=instance_id, script=start_command) def stop_prometheus_server(self) -> None: stop_command = Commands.STOP_PROMETHEUS_COMMAND.value instance_id = self.instance_id_of_static_services + logger.info(f"Current execute commands in `static service stack` which named {self.static_service_stack_name}.") self.exec_script_instance_and_return(name_or_id=instance_id, script=stop_command) def restart_prometheus_server(self) -> None: @@ -1225,11 +1231,13 @@ class AWSInstance: raise Exception(f'Current static services stack was not create complete, please check.') refresh_config_commands = self.refresh_prometheus_commands() + logger.info(f"Current execute commands in `static service stack` which named {self.static_service_stack_name}.") for command in refresh_config_commands: self.exec_script_instance_and_return(name_or_id=static_services_id, script=command) # Special support spark metrics into prometheus spark_config_commands = self.refresh_spark_metrics_commands() + logger.info(f"Current execute commands in `static service stack` which named {self.static_service_stack_name}.") for command in spark_config_commands: self.exec_script_instance_and_return(name_or_id=static_services_id, script=command) @@ -1294,12 +1302,14 @@ class AWSInstance: def is_prometheus_configured(self, host: str) -> bool: static_services_instance_id = self.instance_id_of_static_services check_command = Commands.PROMETHEUS_CFG_CHECK_COMMAND.value.format(node=host) + logger.info(f"Current execute commands in `static service stack` which named {self.static_service_stack_name}.") output = self.exec_script_instance_and_return(name_or_id=static_services_instance_id, script=check_command) return output['StandardOutputContent'] == '0\n' def get_prometheus_configured_hosts(self, hosts: List) -> List: static_services_instance_id = self.instance_id_of_static_services configured_hosts = [] + logger.info(f"Current execute commands in `static service stack` which named {self.static_service_stack_name}.") for host in hosts: check_command = Commands.PROMETHEUS_CFG_CHECK_COMMAND.value.format(node=host) output = self.exec_script_instance_and_return(name_or_id=static_services_instance_id, script=check_command) @@ -1359,6 +1369,7 @@ class AWSInstance: def _check_prometheus_exists_nodes(self, params: Dict) -> Dict: exists_nodes: Dict = {} + logger.info(f"Current execute commands in `static service stack` which named {self.static_service_stack_name}.") for k, v in params.items(): command = Commands.PROMETHEUS_CFG_CHECK_COMMAND.value.format(node=k) output = self.exec_script_instance_and_return( @@ -1370,6 +1381,7 @@ class AWSInstance: def _check_prometheus_not_exists_nodes(self, params: Dict) -> Dict: not_exists_nodes: Dict = {} + logger.info(f"Current execute commands in `static service stack` which named {self.static_service_stack_name}.") for k, v in params.items(): command = Commands.PROMETHEUS_CFG_CHECK_COMMAND.value.format(node=k) output = self.exec_script_instance_and_return( @@ -1392,16 +1404,19 @@ class AWSInstance: def refresh_prometheus_config_after_scale_up(self, expected_nodes: Dict) -> None: commands = self.refresh_prometheus_commands_after_scale(expected_nodes) + logger.info(f"Current execute commands in `static service stack` which named {self.static_service_stack_name}.") for command in commands: self.exec_script_instance_and_return(name_or_id=self.instance_id_of_static_services, script=command) def refresh_prometheus_spark_driver_of_kylin_after_scale_up(self, expected_nodes: Dict) -> None: commands = self.refresh_prometheus_spark_driver_of_kylin_after_scale(expected_nodes) + logger.info(f"Current execute commands in `static service stack` which named {self.static_service_stack_name}.") for command in commands: self.exec_script_instance_and_return(name_or_id=self.instance_id_of_static_services, script=command) def refresh_prometheus_spark_metrics_of_kylin_in_cluster_after_scale_up(self, expected_nodes: Dict) -> None: commands = self.refresh_prometheus_spark_driver_of_kylin_in_cluster_after_scale(expected_nodes) + logger.info(f"Current execute commands in `static service stack` which named {self.static_service_stack_name}.") for command in commands: self.exec_script_instance_and_return(name_or_id=self.instance_id_of_static_services, script=command) @@ -1410,12 +1425,14 @@ class AWSInstance: def refresh_prometheus_config_after_scale_down(self, exists_nodes: Dict) -> None: commands = [Commands.PROMETHEUS_DELETE_CFG_COMMAND.value.format(node=worker) for worker in exists_nodes.keys()] + logger.info(f"Current execute commands in `static service stack` which named {self.static_service_stack_name}.") for command in commands: self.exec_script_instance_and_return(name_or_id=self.instance_id_of_static_services, script=command) def refresh_prometheus_spark_driver_of_kylin_after_scale_down(self, exists_nodes: Dict) -> None: commands = [Commands.SPARK_DRIVER_METRIC_OF_KYLIN_DELETE_CFG_COMMAND.value.format(node=worker) for worker in exists_nodes.keys()] + logger.info(f"Current execute commands in `static service stack` which named {self.static_service_stack_name}.") for command in commands: self.exec_script_instance_and_return(name_or_id=self.instance_id_of_static_services, script=command) @@ -1935,6 +1952,7 @@ class AWSInstance: vm_name = None if isinstance(name_or_id, str): vm_name = [name_or_id] + logger.info(f"Current instance id: {name_or_id} is executing commands: {script}.") response = self.send_command(vm_name=vm_name, script=script) command_id = response['Command']['CommandId'] time.sleep(5) @@ -2134,8 +2152,9 @@ class AWSInstance: def upload_file_to_s3(self, local_file_dir: str, filename: str, bucket: str, bucket_dir: str) -> None: logger.info(f'Uploading {filename} from {local_file_dir} to S3 bucket: {bucket}/{bucket_dir}.') - self.s3_client.upload_file(os.path.join(local_file_dir, filename), bucket, bucket_dir + filename) - logger.info(f'Uploaded {filename} successfully.') + full_filename = os.path.join(local_file_dir, filename) + self.s3_client.upload_file(full_filename, bucket, bucket_dir + filename, Callback=ProgressBar(full_filename)) + logger.info(f'\nUploaded {filename} successfully.') def is_stack_deleted_complete(self, stack_name: str) -> bool: if self.is_stack_create_complete(stack_name): diff --git a/utils.py b/utils/common_utils.py similarity index 95% rename from utils.py rename to utils/common_utils.py index 833d4de..14e31f4 100644 --- a/utils.py +++ b/utils/common_utils.py @@ -18,6 +18,8 @@ import logging import os import shutil +import sys +from datetime import datetime from typing import List, Tuple, Generator, Dict import requests @@ -117,10 +119,21 @@ class Utils: return logger.info(f"Downloading {os.path.abspath(file_path)}.") with open(file_path, 'wb') as f: + # set downloading bar + total_length = int(r.headers.get('content-length')) + temp_done = 0 + start = datetime.now() for chunk in r.iter_content(chunk_size=1024 * 8): if not chunk: break + + temp_done += len(chunk) f.write(chunk) + end = datetime.now() + done = int(50 * temp_done / total_length) + sys.stdout.write(f"\r[{'=' * done}{' '* (50 - done)}] % {temp_done} / {total_length} " + f"- Duration: {end - start}\r") + sys.stdout.flush() f.flush() os.fsync(f.fileno()) diff --git a/engine_utils.py b/utils/engine_utils.py similarity index 99% rename from engine_utils.py rename to utils/engine_utils.py index 5f432e7..4f6ff52 100644 --- a/engine_utils.py +++ b/utils/engine_utils.py @@ -23,7 +23,7 @@ from constant.config import Config from constant.deployment import NodeType, ScaleType from constant.yaml_files import Tar from instances.kylin_utils import KylinUtils -from utils import Utils +from utils.common_utils import Utils logger = logging.getLogger(__name__) diff --git a/utils/progress_bar.py b/utils/progress_bar.py new file mode 100644 index 0000000..e2051b1 --- /dev/null +++ b/utils/progress_bar.py @@ -0,0 +1,43 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import os +import sys +import threading +from datetime import datetime + + +class ProgressBar(object): + + def __init__(self, filename): + self._filename = filename + self._size = int(os.path.getsize(filename)) + self._seen_so_far = 0 + self._lock = threading.Lock() + self._done = 0 + self._start = datetime.now() + self._end = None + + def __call__(self, bytes_amount): + # To simplify, assume this is hooked up to a single filename + with self._lock: + self._end = datetime.now() + self._seen_so_far += bytes_amount + self._done = int(50 * self._seen_so_far / self._size) + sys.stdout.write(f"\r[{'=' * self._done}{' ' * (50 - self._done)}] % {self._seen_so_far} / {self._size} " + f"- Duration: {self._end - self._start}\r") + sys.stdout.flush()