This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin4_on_cloud
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/kylin4_on_cloud by this push:
     new 3f82213  # minor fix, add progress bar for logging detail
3f82213 is described below

commit 3f822137b08c5d875bb6c50073b117e945ab7748
Author: Mukvin <boyboys...@163.com>
AuthorDate: Sat Apr 2 14:53:53 2022 +0800

    # minor fix, add progress bar for logging detail
---
 clouds/aws.py                            |  2 +-
 engine.py                                |  6 +----
 instances/aws_instance.py                | 25 ++++++++++++++++---
 utils.py => utils/common_utils.py        | 13 ++++++++++
 engine_utils.py => utils/engine_utils.py |  2 +-
 utils/progress_bar.py                    | 43 ++++++++++++++++++++++++++++++++
 6 files changed, 81 insertions(+), 10 deletions(-)

diff --git a/clouds/aws.py b/clouds/aws.py
index 8b944ea..b082c1a 100644
--- a/clouds/aws.py
+++ b/clouds/aws.py
@@ -28,7 +28,7 @@ from constant.kylin_properties_params import KylinProperties
 from constant.path import KYLIN_PROPERTIES_TEMPLATE_DIR
 from constant.yaml_params import Params
 from instances.aws_instance import AWSInstance
-from utils import Utils
+from utils.common_utils import Utils
 
 logger = logging.getLogger(__name__)
 
diff --git a/engine.py b/engine.py
index f43d438..1b717ad 100644
--- a/engine.py
+++ b/engine.py
@@ -16,16 +16,12 @@
 #
 
 import logging
-import os
 from typing import List
 
-import yaml
-
 from constant.client import Client
 from constant.config import Config
 from constant.deployment import ScaleType, NodeType, Cluster
-from constant.yaml_files import File
-from engine_utils import EngineUtils
+from utils.engine_utils import EngineUtils
 
 logger = logging.getLogger(__name__)
 
diff --git a/instances/aws_instance.py b/instances/aws_instance.py
index 002cc8e..f370d95 100644
--- a/instances/aws_instance.py
+++ b/instances/aws_instance.py
@@ -34,7 +34,8 @@ from constant.path import JARS_PATH, TARS_PATH, SCRIPTS_PATH, 
KYLIN_PROPERTIES_T
 from constant.server_mode import ServerMode
 from constant.yaml_files import File
 from constant.yaml_params import Params
-from utils import Utils
+from utils.common_utils import Utils
+from utils.progress_bar import ProgressBar
 
 logger = logging.getLogger(__name__)
 
@@ -563,6 +564,7 @@ class AWSInstance:
         # FIXME: it's hard code to make sure that zks were already initialized.
         time.sleep(10)
 
+        logger.info(f"Current execute commands in `Zookeeper stack` which 
named {zk_stack}.")
         self.refresh_zks_cfg(zk_ips=zk_ips, zk_ids=zk_ids)
         self.start_zks(zk_ids=zk_ids, zk_ips=zk_ips)
 
@@ -927,6 +929,7 @@ class AWSInstance:
             raise Exception(msg)
 
         start_command = Commands.START_SPARK_MASTER_COMMAND.value
+        logger.info(f"Current execute commands in `Spark master stack` which 
named {self.spark_master_stack_name}.")
         self.exec_script_instance_and_return(name_or_id=spark_master_id, 
script=start_command)
 
     def get_spark_master_instance_id(self) -> str:
@@ -1183,6 +1186,7 @@ class AWSInstance:
         instance_id = self.get_instance_id(stack_name)
         # spark decommission feature start to be supported in spark 3.1.x.
         # refer: https://issues.apache.org/jira/browse/SPARK-20624.
+        logger.info(f"Current execute commands in `Spark Slave stack` which 
named {stack_name}.")
         try:
             self.exec_script_instance_and_return(
                 name_or_id=instance_id, 
script=Commands.SPARK_DECOMMISION_WORKER_COMMAND.value)
@@ -1206,11 +1210,13 @@ class AWSInstance:
     def start_prometheus_server(self) -> None:
         start_command = Commands.START_PROMETHEUS_COMMAND.value
         instance_id = self.instance_id_of_static_services
+        logger.info(f"Current execute commands in `static service stack` which 
named {self.static_service_stack_name}.")
         self.exec_script_instance_and_return(name_or_id=instance_id, 
script=start_command)
 
     def stop_prometheus_server(self) -> None:
         stop_command = Commands.STOP_PROMETHEUS_COMMAND.value
         instance_id = self.instance_id_of_static_services
+        logger.info(f"Current execute commands in `static service stack` which 
named {self.static_service_stack_name}.")
         self.exec_script_instance_and_return(name_or_id=instance_id, 
script=stop_command)
 
     def restart_prometheus_server(self) -> None:
@@ -1225,11 +1231,13 @@ class AWSInstance:
             raise Exception(f'Current static services stack was not create 
complete, please check.')
 
         refresh_config_commands = self.refresh_prometheus_commands()
+        logger.info(f"Current execute commands in `static service stack` which 
named {self.static_service_stack_name}.")
         for command in refresh_config_commands:
             
self.exec_script_instance_and_return(name_or_id=static_services_id, 
script=command)
 
         # Special support spark metrics into prometheus
         spark_config_commands = self.refresh_spark_metrics_commands()
+        logger.info(f"Current execute commands in `static service stack` which 
named {self.static_service_stack_name}.")
         for command in spark_config_commands:
             
self.exec_script_instance_and_return(name_or_id=static_services_id, 
script=command)
 
@@ -1294,12 +1302,14 @@ class AWSInstance:
     def is_prometheus_configured(self, host: str) -> bool:
         static_services_instance_id = self.instance_id_of_static_services
         check_command = 
Commands.PROMETHEUS_CFG_CHECK_COMMAND.value.format(node=host)
+        logger.info(f"Current execute commands in `static service stack` which 
named {self.static_service_stack_name}.")
         output = 
self.exec_script_instance_and_return(name_or_id=static_services_instance_id, 
script=check_command)
         return output['StandardOutputContent'] == '0\n'
 
     def get_prometheus_configured_hosts(self, hosts: List) -> List:
         static_services_instance_id = self.instance_id_of_static_services
         configured_hosts = []
+        logger.info(f"Current execute commands in `static service stack` which 
named {self.static_service_stack_name}.")
         for host in hosts:
             check_command = 
Commands.PROMETHEUS_CFG_CHECK_COMMAND.value.format(node=host)
             output = 
self.exec_script_instance_and_return(name_or_id=static_services_instance_id, 
script=check_command)
@@ -1359,6 +1369,7 @@ class AWSInstance:
 
     def _check_prometheus_exists_nodes(self, params: Dict) -> Dict:
         exists_nodes: Dict = {}
+        logger.info(f"Current execute commands in `static service stack` which 
named {self.static_service_stack_name}.")
         for k, v in params.items():
             command = 
Commands.PROMETHEUS_CFG_CHECK_COMMAND.value.format(node=k)
             output = self.exec_script_instance_and_return(
@@ -1370,6 +1381,7 @@ class AWSInstance:
 
     def _check_prometheus_not_exists_nodes(self, params: Dict) -> Dict:
         not_exists_nodes: Dict = {}
+        logger.info(f"Current execute commands in `static service stack` which 
named {self.static_service_stack_name}.")
         for k, v in params.items():
             command = 
Commands.PROMETHEUS_CFG_CHECK_COMMAND.value.format(node=k)
             output = self.exec_script_instance_and_return(
@@ -1392,16 +1404,19 @@ class AWSInstance:
 
     def refresh_prometheus_config_after_scale_up(self, expected_nodes: Dict) 
-> None:
         commands = self.refresh_prometheus_commands_after_scale(expected_nodes)
+        logger.info(f"Current execute commands in `static service stack` which 
named {self.static_service_stack_name}.")
         for command in commands:
             
self.exec_script_instance_and_return(name_or_id=self.instance_id_of_static_services,
 script=command)
 
     def refresh_prometheus_spark_driver_of_kylin_after_scale_up(self, 
expected_nodes: Dict) -> None:
         commands = 
self.refresh_prometheus_spark_driver_of_kylin_after_scale(expected_nodes)
+        logger.info(f"Current execute commands in `static service stack` which 
named {self.static_service_stack_name}.")
         for command in commands:
             
self.exec_script_instance_and_return(name_or_id=self.instance_id_of_static_services,
 script=command)
 
     def 
refresh_prometheus_spark_metrics_of_kylin_in_cluster_after_scale_up(self, 
expected_nodes: Dict) -> None:
         commands = 
self.refresh_prometheus_spark_driver_of_kylin_in_cluster_after_scale(expected_nodes)
+        logger.info(f"Current execute commands in `static service stack` which 
named {self.static_service_stack_name}.")
         for command in commands:
             
self.exec_script_instance_and_return(name_or_id=self.instance_id_of_static_services,
 script=command)
 
@@ -1410,12 +1425,14 @@ class AWSInstance:
 
     def refresh_prometheus_config_after_scale_down(self, exists_nodes: Dict) 
-> None:
         commands = 
[Commands.PROMETHEUS_DELETE_CFG_COMMAND.value.format(node=worker) for worker in 
exists_nodes.keys()]
+        logger.info(f"Current execute commands in `static service stack` which 
named {self.static_service_stack_name}.")
         for command in commands:
             
self.exec_script_instance_and_return(name_or_id=self.instance_id_of_static_services,
 script=command)
 
     def refresh_prometheus_spark_driver_of_kylin_after_scale_down(self, 
exists_nodes: Dict) -> None:
         commands = 
[Commands.SPARK_DRIVER_METRIC_OF_KYLIN_DELETE_CFG_COMMAND.value.format(node=worker)
                     for worker in exists_nodes.keys()]
+        logger.info(f"Current execute commands in `static service stack` which 
named {self.static_service_stack_name}.")
         for command in commands:
             
self.exec_script_instance_and_return(name_or_id=self.instance_id_of_static_services,
 script=command)
 
@@ -1935,6 +1952,7 @@ class AWSInstance:
         vm_name = None
         if isinstance(name_or_id, str):
             vm_name = [name_or_id]
+        logger.info(f"Current instance id: {name_or_id} is executing commands: 
{script}.")
         response = self.send_command(vm_name=vm_name, script=script)
         command_id = response['Command']['CommandId']
         time.sleep(5)
@@ -2134,8 +2152,9 @@ class AWSInstance:
 
     def upload_file_to_s3(self, local_file_dir: str, filename: str, bucket: 
str, bucket_dir: str) -> None:
         logger.info(f'Uploading {filename} from {local_file_dir} to S3 bucket: 
{bucket}/{bucket_dir}.')
-        self.s3_client.upload_file(os.path.join(local_file_dir, filename), 
bucket, bucket_dir + filename)
-        logger.info(f'Uploaded {filename} successfully.')
+        full_filename = os.path.join(local_file_dir, filename)
+        self.s3_client.upload_file(full_filename, bucket, bucket_dir + 
filename, Callback=ProgressBar(full_filename))
+        logger.info(f'\nUploaded {filename} successfully.')
 
     def is_stack_deleted_complete(self, stack_name: str) -> bool:
         if self.is_stack_create_complete(stack_name):
diff --git a/utils.py b/utils/common_utils.py
similarity index 95%
rename from utils.py
rename to utils/common_utils.py
index 833d4de..14e31f4 100644
--- a/utils.py
+++ b/utils/common_utils.py
@@ -18,6 +18,8 @@
 import logging
 import os
 import shutil
+import sys
+from datetime import datetime
 from typing import List, Tuple, Generator, Dict
 
 import requests
@@ -117,10 +119,21 @@ class Utils:
             return
         logger.info(f"Downloading {os.path.abspath(file_path)}.")
         with open(file_path, 'wb') as f:
+            # set downloading bar
+            total_length = int(r.headers.get('content-length'))
+            temp_done = 0
+            start = datetime.now()
             for chunk in r.iter_content(chunk_size=1024 * 8):
                 if not chunk:
                     break
+
+                temp_done += len(chunk)
                 f.write(chunk)
+                end = datetime.now()
+                done = int(50 * temp_done / total_length)
+                sys.stdout.write(f"\r[{'=' * done}{' '* (50 - done)}] % 
{temp_done} / {total_length} "
+                                 f"- Duration: {end - start}\r")
+                sys.stdout.flush()
                 f.flush()
                 os.fsync(f.fileno())
 
diff --git a/engine_utils.py b/utils/engine_utils.py
similarity index 99%
rename from engine_utils.py
rename to utils/engine_utils.py
index 5f432e7..4f6ff52 100644
--- a/engine_utils.py
+++ b/utils/engine_utils.py
@@ -23,7 +23,7 @@ from constant.config import Config
 from constant.deployment import NodeType, ScaleType
 from constant.yaml_files import Tar
 from instances.kylin_utils import KylinUtils
-from utils import Utils
+from utils.common_utils import Utils
 
 logger = logging.getLogger(__name__)
 
diff --git a/utils/progress_bar.py b/utils/progress_bar.py
new file mode 100644
index 0000000..e2051b1
--- /dev/null
+++ b/utils/progress_bar.py
@@ -0,0 +1,43 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import os
+import sys
+import threading
+from datetime import datetime
+
+
+class ProgressBar(object):
+
+    def __init__(self, filename):
+        self._filename = filename
+        self._size = int(os.path.getsize(filename))
+        self._seen_so_far = 0
+        self._lock = threading.Lock()
+        self._done = 0
+        self._start = datetime.now()
+        self._end = None
+
+    def __call__(self, bytes_amount):
+        # To simplify, assume this is hooked up to a single filename
+        with self._lock:
+            self._end = datetime.now()
+            self._seen_so_far += bytes_amount
+            self._done = int(50 * self._seen_so_far / self._size)
+            sys.stdout.write(f"\r[{'=' * self._done}{' ' * (50 - self._done)}] 
% {self._seen_so_far} / {self._size} "
+                             f"- Duration: {self._end - self._start}\r")
+            sys.stdout.flush()

Reply via email to