This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push: new 036ef5b Add a tool to show segment status (#2260) 036ef5b is described below commit 036ef5bcb991658bb5187e81d91a66dfeaeddda3 Author: LingBin <lingbi...@gmail.com> AuthorDate: Tue Nov 26 21:35:16 2019 -0600 Add a tool to show segment status (#2260) In order to be aware of the convert process from AlphaRowset to BetaRowset, we need a mechanism to know the process of convert. --- tools/show_segment_status/README.md | 71 +++++++ tools/show_segment_status/be_tablet_reslover.py | 110 ++++++++++ tools/show_segment_status/conf | 14 ++ tools/show_segment_status/fe_meta_resolver.py | 245 +++++++++++++++++++++++ tools/show_segment_status/show_segment_status.py | 127 ++++++++++++ 5 files changed, 567 insertions(+) diff --git a/tools/show_segment_status/README.md b/tools/show_segment_status/README.md new file mode 100644 index 0000000..5eacd87 --- /dev/null +++ b/tools/show_segment_status/README.md @@ -0,0 +1,71 @@ +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +This tool is used to get the progress of all current table transitions +during the online `segment_2` function. + +Currently, you can specify 3 dimensions (in the conf file) to view the +results, you can specify one of them individually, or you can customize +the combination (that is, specify multiple at the same time). + +# Note +We use MySQLdb python lib to fetch meta from FE, so you must install it. + +You can get MySQLdb lib from https://pypi.python.org/pypi/MySQL-python, +then you can install it as follows: +``` +$ tar zxvf MySQL-python-*.tar.gz +$ cd MySQL-python-* +$ python setup.py build +$ python setup.py install +``` + +# Steps +1. Fill in the conf according to your cluster configuration, and specify + the table or be you want to watch. +2. Execute `python show_segment_status.py` + +# Example +1. If you want to watch the process of a table named `xxxx`, you can specify + `table_name = xxxx` in conf file + +2. If you want to watch the process on be whose be_id is `xxxx`, you can specify + `be_id = xxxx` in conf file + +# Output Example Format + +``` + +==========SUMMARY()=========== +rowset_count: 289845 / 289845 +rowset_disk_size: 84627551189 / 84627551189 +rowset_row_count: 1150899153 / 1150899153 +=========================================================== +==========SUMMARY(table=xxxx)=========== +rowset_count: 289845 / 289845 +rowset_disk_size: 84627551189 / 84627551189 +rowset_row_count: 1150899153 / 1150899153 +=========================================================== +==========SUMMARY(be=10003 )=========== +rowset_count: 79650 / 79650 +rowset_disk_size: 24473921575 / 24473921575 +rowset_row_count: 331449328 / 331449328 +=========================================================== + +``` diff --git a/tools/show_segment_status/be_tablet_reslover.py b/tools/show_segment_status/be_tablet_reslover.py new file mode 100644 index 0000000..b6f15dd --- /dev/null +++ b/tools/show_segment_status/be_tablet_reslover.py @@ -0,0 +1,110 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import json +import time +from urllib import urlopen + +class BeTabletResolver: + def __init__(self, be_list, tablet_map): + self.tablet_map = tablet_map + self.tablet_infos = {} + + self.be_map = {} + for be in be_list: + self.be_map[be['be_id']] = be + + def debug_output(self): + print "tablet_infos:(%s), print up to ten here:" % len(self.tablet_infos) + self._print_list(self.tablet_infos.values()[0:10]) + print + + def _print_list(self, one_list): + for item in one_list: + print item + + def init(self): + self.fetch_tablet_meta() + + def fetch_tablet_meta(self): + print "fetching tablet metas from BEs..." + count = 0 + for tablet in self.tablet_map.values(): + be_id = tablet['be_id'] + be = self.be_map[be_id] + url = self._make_url(be, tablet) + print url + tablet_meta = self._fetch_tablet_meta_by_id(url) + self._decode_rs_metas_of_tablet(tablet_meta) + # slow down, do not need too fast + count += 1 + if count % 10 == 0: + time.sleep(0.005) + print "finished. \n" + return + + def _make_url(self, be, tablet): + url_list = [] + url_list.append("http://") + url_list.append(be["ip"]) + url_list.append(":") + url_list.append(be["http_port"]) + url_list.append("/api/meta/header/") + url_list.append(str(tablet["tablet_id"])) + url_list.append("/") + url_list.append(str(tablet["schema_hash"])) + return "".join(url_list) + + def _fetch_tablet_meta_by_id(self, url): + tablet_meta = urlopen(url).read() + tablet_meta = json.loads(tablet_meta) + return tablet_meta + + def _decode_rs_metas_of_tablet(self, tablet_meta): + # When something wrong, may do not have rs_metas attr, so use 'get()' instead of '[]' + rs_metas = tablet_meta.get('rs_metas') + if rs_metas is None: + return + size = len(rs_metas) + + rowsets = [] + for rs_meta in rs_metas: + rowset = {} + rowset['tablet_id'] = rs_meta['tablet_id'] + rowset['num_rows'] = rs_meta['num_rows'] + rowset['data_disk_size'] = rs_meta['data_disk_size'] + if rs_meta['rowset_type'] == 'BETA_ROWSET': + rowset['is_beta'] = True + else: + rowset['is_beta'] = False + rowsets.append(rowset); + + self.tablet_infos[rs_meta['tablet_id']] = rowsets + return + + def get_rowsets_by_tablet(self, tablet_id): + return self.tablet_infos.get(tablet_id) + + def get_all_rowsets(self): + return self.tablet_infos.values() + +if __name__ == '__main__': + main() + diff --git a/tools/show_segment_status/conf b/tools/show_segment_status/conf new file mode 100644 index 0000000..24cf36f --- /dev/null +++ b/tools/show_segment_status/conf @@ -0,0 +1,14 @@ +[cluster] +fe_host = +query_port = +user = root +query_pwd = + +# Following confs are optional +# select one database +db_name = +# select one table +table_name = +# select one be. when value is 0 means all bes +be_id = 0 + diff --git a/tools/show_segment_status/fe_meta_resolver.py b/tools/show_segment_status/fe_meta_resolver.py new file mode 100644 index 0000000..7b843b4 --- /dev/null +++ b/tools/show_segment_status/fe_meta_resolver.py @@ -0,0 +1,245 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import MySQLdb + +# NOTE: The default organization of meta info is cascading, we flatten its structure +# NOTE: We get schema-hash from proc '/dbs/db_id/tbl_id/'index_schema' +class FeMetaResolver: + def __init__(self, fe_host, query_port, user, query_pwd): + self.fe_host = fe_host + self.query_port = query_port + self.user = user + self.query_pwd = query_pwd + + self.db = None + self.cur = None + + self.be_list = [] + self.db_list = [] + # Only base tables, excluding rollups + self.table_list = [] + # All rollups, including base tables + self.rollup_map = {} + self.partition_list = [] + self.index_list = [] + self.tablet_map = {} + + def init(self): + self.connect_mysql() + self.fetch_be_list(); + self.fetch_db_list(); + self.fetch_table_list(); + self.fetch_rollup_map(); + self.fetch_partition_list(); + self.fetch_idx_list(); + self._merge_schema_hash_to_idx_list() + self.fetch_tablet_list(); + self.close() + + def connect_mysql(self): + try: + self.db = MySQLdb.connect(host=self.fe_host, port=self.query_port, + user=self.user, + passwd=self.query_pwd) + self.cur = self.db.cursor() + except MySQLdb.Error as e: + print ("Failed to connect fe server. error %s:%s" % (str(e.args[0]), e.args[1])) + exit(-1); + + def exec_sql(self, sql): + try: + self.cur.execute(sql) + except MySQLdb.Error as e: + print ("exec sql error %s:%s" % (str(e.args[0]), e.args[1])) + exit(-1); + + def close(self): + if self.db.open: + self.cur.close() + self.db.close() + + def fetch_be_list(self): + show_be_sql = "show backends" + self.exec_sql(show_be_sql); + be_list = self.cur.fetchall() + for be_tuple in be_list : + be = {} + be['be_id'] = long(be_tuple[0]) + be['ip'] = be_tuple[2] + be['http_port'] = be_tuple[5] + self.be_list.append(be) + + return + + def fetch_db_list(self): + show_database_sql = "show proc \"/dbs\" " + self.exec_sql(show_database_sql); + db_list = self.cur.fetchall() + for db_tuple in db_list : + db = {} + if long(db_tuple[0]) <= 0: + continue + db['db_id'] = long(db_tuple[0]) + db['db_name'] = db_tuple[1] + self.db_list.append(db) + + def fetch_table_list(self): + for db in self.db_list: + self._fetch_tables_by_db(db) + + def _fetch_tables_by_db(self, db): + sql = "show proc \"/dbs/%s\" " % db['db_id'] + self.exec_sql(sql); + table_list = self.cur.fetchall() + for table_tuple in table_list : + table = {} + table['db_id'] = db['db_id'] + table['db_name'] = db['db_name'] + table['tbl_id'] = long(table_tuple[0]) + table['tbl_name'] = table_tuple[1] + self.table_list.append(table) + return + + def fetch_rollup_map(self): + for table in self.table_list: + self._fetch_rollups_by_table(table); + + def _fetch_rollups_by_table(self, table): + sql = "show proc \"/dbs/%s/%s/index_schema\" " % (table['db_id'], table['tbl_id']) + self.exec_sql(sql); + index_list = self.cur.fetchall() + for index_tuple in index_list : + index = {} + index['tbl_id'] = table['tbl_id'] + index['tbl_name'] = table['tbl_name'] + index['idx_id'] = long(index_tuple[0]) + index['schema_hash'] = long(index_tuple[3]) + self.rollup_map[index['idx_id']] = index + return + + def fetch_partition_list(self): + for table in self.table_list: + self._fetch_partitions_by_table(table); + + def _fetch_partitions_by_table(self, table): + sql = "show proc \"/dbs/%s/%s/partitions\" " % (table['db_id'], table['tbl_id']) + self.exec_sql(sql); + partition_list = self.cur.fetchall() + for partition_tuple in partition_list : + partition = {} + partition['db_id'] = table['db_id'] + partition['db_name'] = table['db_name'] + partition['tbl_id'] = table['tbl_id'] + partition['tbl_name'] = table['tbl_name'] + partition['partition_id'] = long(partition_tuple[0]) + partition['partition_name'] = partition_tuple[1] + self.partition_list.append(partition) + return + + def fetch_idx_list(self): + for partition in self.partition_list: + self._fetch_idxes_by_partition(partition); + + def _fetch_idxes_by_partition(self, partition): + sql = "show proc \"/dbs/%s/%s/partitions/%s\" " % \ + (partition['db_id'], partition['tbl_id'], partition['partition_id']) + self.exec_sql(sql); + index_list = self.cur.fetchall() + for idx_tuple in index_list : + idx = {} + idx['db_id'] = partition['db_id'] + idx['db_name'] = partition['db_name'] + idx['tbl_id'] = partition['tbl_id'] + idx['tbl_name'] = partition['tbl_name'] + idx['partition_id'] = partition['partition_id'] + idx['partition_name'] = partition['partition_name'] + idx['idx_id'] = long(idx_tuple[0]) + idx['idx_name'] = idx_tuple[1] + idx['idx_state'] = idx_tuple[2] + self.index_list.append(idx) + return + + def _merge_schema_hash_to_idx_list(self): + for index in self.index_list: + idx_id = index['idx_id'] + rollup = self.rollup_map.get(idx_id) + index['schema_hash'] = rollup['schema_hash'] + + def fetch_tablet_list(self): + for index in self.index_list: + self._fetch_tablets_by_index(index); + + def _fetch_tablets_by_index(self, index): + sql = "show proc \"/dbs/%s/%s/partitions/%s/%s\" " % \ + (index['db_id'], index['tbl_id'], index['partition_id'], index['idx_id']) + self.exec_sql(sql); + tablet_list = self.cur.fetchall() + for tablet_tuple in tablet_list : + tablet = {} + tablet['db_id'] = index['db_id'] + tablet['db_name'] = index['db_name'] + tablet['tbl_id'] = index['tbl_id'] + tablet['tbl_name'] = index['tbl_name'] + tablet['partition_id'] = index['partition_id'] + tablet['partition_name'] = index['partition_name'] + tablet['idx_id'] = index['idx_id'] + tablet['idx_name'] = index['idx_name'] + tablet['idx_state'] = index['idx_state'] + tablet['tablet_id'] = long(tablet_tuple[0]) + tablet['replica_id'] = long(tablet_tuple[1]) + tablet['be_id'] = long(tablet_tuple[2]) + tablet['schema_hash'] = index["schema_hash"] + self.tablet_map[tablet['tablet_id']] = tablet + return + + def debug_output(self): + print "be_list:" + self._print_list(self.be_list) + print + print "database_list:" + self._print_list(self.db_list) + print + print "table_list:" + self._print_list(self.table_list) + print + print "rollup_list:" + self._print_list(self.rollup_map.values()) + print + print "partition_list:" + self._print_list(self.partition_list) + print + print "index_list:" + self._print_list(self.index_list) + print + print "tablet_map:(%s), print up to ten here:" % len(self.tablet_map) + self._print_list(self.tablet_map.values()[0:10]) + print + + def _print_list(self, one_list): + for item in one_list: + print item + + def get_tablet_by_id(self, tablet_id): + return self.tablet_map.get(tablet_id) + + def get_all_tablets(self): + return self.tablet_map.values() + diff --git a/tools/show_segment_status/show_segment_status.py b/tools/show_segment_status/show_segment_status.py new file mode 100644 index 0000000..04cb85d --- /dev/null +++ b/tools/show_segment_status/show_segment_status.py @@ -0,0 +1,127 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import ConfigParser +import re +import sys +import os +import json +from urllib import urlopen + +from fe_meta_resolver import FeMetaResolver +from be_tablet_reslover import BeTabletResolver + +class Calc: + def __init__(self, fe_meta, be_resolver): + self.fe_meta = fe_meta + self.be_resolver = be_resolver + + def calc_cluster_summary(self): + self.calc_table_and_be_summary("", "", 0) + return + + def calc_table_summary(self, db_name, table_name): + self.calc_table_and_be_summary(db_name, table_name, 0) + return + + def calc_be_summary(self, be_id): + self.calc_table_and_be_summary("", "", be_id) + return + + def calc_table_and_be_summary(self, db_name, table_name, be_id): + total_rs_count = 0 + beta_rs_count = 0 + total_rs_size = 0 + beta_rs_size = 0 + total_rs_row_count = 0 + beta_rs_row_count = 0 + + for tablet in self.fe_meta.get_all_tablets(): + # The db_name from meta contain cluster name, so use 'in' here + if len(db_name) != 0 and (not (db_name in tablet['db_name'])): + continue + if len(table_name) != 0 and (tablet['tbl_name'] != table_name): + continue; + if be_id != 0 and tablet['be_id'] != be_id: + continue + rowsets = self.be_resolver.get_rowsets_by_tablet(tablet['tablet_id']) + # If tablet has gone away, ignore it + if rowsets is None: + continue + for tablet_info in rowsets: + total_rs_count += 1 + total_rs_row_count += tablet_info['num_rows'] + total_rs_size += tablet_info['data_disk_size'] + if tablet_info['is_beta']: + beta_rs_count += 1 + beta_rs_size += tablet_info['data_disk_size'] + beta_rs_row_count += tablet_info['num_rows'] + + content_str = "" + if len(db_name) != 0: + content_str += ("db=%s " % db_name) + if len(table_name) != 0: + content_str += ("table=%s " % table_name) + if be_id != 0: + content_str += ("be=%s " % be_id) + print "==========SUMMARY(%s)===========" % (content_str) + print "rowset_count: %s / %s" % (beta_rs_count, total_rs_count) + print "rowset_disk_size: %s / %s" % (beta_rs_size, total_rs_size) + print "rowset_row_count: %s / %s" % (beta_rs_row_count, total_rs_row_count) + print "===========================================================" + return; + +def main(): + cf = ConfigParser.ConfigParser() + cf.read("./conf") + fe_host = cf.get('cluster', 'fe_host') + query_port = int(cf.get('cluster', 'query_port')) + user = cf.get('cluster', 'user') + query_pwd = cf.get('cluster', 'query_pwd') + + db_name = cf.get('cluster', 'db_name') + table_name = cf.get('cluster', 'table_name') + be_id = cf.getint('cluster', 'be_id') + + print "============= CONF =============" + print "fe_host =", fe_host + print "fe_query_port =", query_port + print "user =", user + print "db_name =", db_name + print "table_name =", table_name + print "be_id =", be_id + print "====================================" + + fe_meta = FeMetaResolver(fe_host, query_port, user, query_pwd) + fe_meta.init() + fe_meta.debug_output() + + be_resolver = BeTabletResolver(fe_meta.be_list, fe_meta.tablet_map) + be_resolver.init() + be_resolver.debug_output() + + calc = Calc(fe_meta, be_resolver) + calc.calc_cluster_summary() + calc.calc_table_summary(db_name, table_name); + calc.calc_be_summary(be_id); + +if __name__ == '__main__': + main() + --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org