Merge branch '1.5.1-SNAPSHOT' into 1.6.0-SNAPSHOT
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/46585ab5 Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/46585ab5 Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/46585ab5 Branch: refs/heads/1.6.0-SNAPSHOT Commit: 46585ab59beb08fb8fe2f4b08badab5c02dc1814 Parents: 44a9166 144d9d5 Author: Eric Newton <eric.new...@gmail.com> Authored: Tue Dec 17 15:03:57 2013 -0500 Committer: Eric Newton <eric.new...@gmail.com> Committed: Tue Dec 17 15:03:57 2013 -0500 ---------------------------------------------------------------------- .../accumulo/server/master/balancer/DefaultLoadBalancer.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/46585ab5/server/base/src/main/java/org/apache/accumulo/server/master/balancer/DefaultLoadBalancer.java ---------------------------------------------------------------------- diff --cc server/base/src/main/java/org/apache/accumulo/server/master/balancer/DefaultLoadBalancer.java index 9b88d74,0000000..1fcab46 mode 100644,000000..100644 --- a/server/base/src/main/java/org/apache/accumulo/server/master/balancer/DefaultLoadBalancer.java +++ b/server/base/src/main/java/org/apache/accumulo/server/master/balancer/DefaultLoadBalancer.java @@@ -1,318 -1,0 +1,319 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.server.master.balancer; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.SortedMap; + +import org.apache.accumulo.core.data.KeyExtent; +import org.apache.accumulo.core.master.thrift.TableInfo; +import org.apache.accumulo.core.master.thrift.TabletServerStatus; +import org.apache.accumulo.core.tabletserver.thrift.TabletStats; +import org.apache.accumulo.server.master.state.TServerInstance; +import org.apache.accumulo.server.master.state.TabletMigration; +import org.apache.log4j.Logger; + +public class DefaultLoadBalancer extends TabletBalancer { + + private static final Logger log = Logger.getLogger(DefaultLoadBalancer.class); + + Iterator<TServerInstance> assignments; + // if tableToBalance is set, then only balance the given table + String tableToBalance = null; + + public DefaultLoadBalancer() { + + } + + public DefaultLoadBalancer(String table) { + tableToBalance = table; + } + + List<TServerInstance> randomize(Set<TServerInstance> locations) { + List<TServerInstance> result = new ArrayList<TServerInstance>(locations); + Collections.shuffle(result); + return result; + } + + public TServerInstance getAssignment(SortedMap<TServerInstance,TabletServerStatus> locations, KeyExtent extent, TServerInstance last) { + if (locations.size() == 0) + return null; + + if (last != null) { + // Maintain locality - TServerInstance simple = new TServerInstance(last.getLocation(), ""); ++ String fakeSessionID = " "; ++ TServerInstance simple = new TServerInstance(last.getLocation(), fakeSessionID); + Iterator<TServerInstance> find = locations.tailMap(simple).keySet().iterator(); + if (find.hasNext()) { + TServerInstance current = find.next(); + if (current.host().equals(last.host())) + return current; + } + } + + // The strategy here is to walk through the locations and hand them back, one at a time + // Grab an iterator off of the set of options; use a new iterator if it hands back something not in the current list. + if (assignments == null || !assignments.hasNext()) + assignments = randomize(locations.keySet()).iterator(); + TServerInstance result = assignments.next(); + if (!locations.containsKey(result)) { + assignments = null; + return randomize(locations.keySet()).iterator().next(); + } + return result; + } + + static class ServerCounts implements Comparable<ServerCounts> { + public final TServerInstance server; + public final int count; + public final TabletServerStatus status; + + ServerCounts(int count, TServerInstance server, TabletServerStatus status) { + this.count = count; + this.server = server; + this.status = status; + } + + public int compareTo(ServerCounts obj) { + int result = count - obj.count; + if (result == 0) + return server.compareTo(obj.server); + return result; + } + } + + public boolean getMigrations(Map<TServerInstance,TabletServerStatus> current, List<TabletMigration> result) { + boolean moreBalancingNeeded = false; + try { + // no moves possible + if (current.size() < 2) { + return false; + } + + // Sort by total number of online tablets, per server + int total = 0; + ArrayList<ServerCounts> totals = new ArrayList<ServerCounts>(); + for (Entry<TServerInstance,TabletServerStatus> entry : current.entrySet()) { + int serverTotal = 0; + if (entry.getValue() != null && entry.getValue().tableMap != null) { + for (Entry<String,TableInfo> e : entry.getValue().tableMap.entrySet()) { + /** + * The check below was on entry.getKey(), but that resolves to a tabletserver not a tablename. Believe it should be e.getKey() which is a tablename + */ + if (tableToBalance == null || tableToBalance.equals(e.getKey())) + serverTotal += e.getValue().onlineTablets; + } + } + totals.add(new ServerCounts(serverTotal, entry.getKey(), entry.getValue())); + total += serverTotal; + } + + // order from low to high + Collections.sort(totals); + Collections.reverse(totals); + int even = total / totals.size(); + int numServersOverEven = total % totals.size(); + + // Move tablets from the servers with too many to the servers with + // the fewest but only nominate tablets to move once. This allows us + // to fill new servers with tablets from a mostly balanced server + // very quickly. However, it may take several balancing passes to move + // tablets from one hugely overloaded server to many slightly + // under-loaded servers. + int end = totals.size() - 1; + int movedAlready = 0; + for (int tooManyIndex = 0; tooManyIndex < totals.size(); tooManyIndex++) { + ServerCounts tooMany = totals.get(tooManyIndex); + int goal = even; + if (tooManyIndex < numServersOverEven) { + goal++; + } + int needToUnload = tooMany.count - goal; + ServerCounts tooLittle = totals.get(end); + int needToLoad = goal - tooLittle.count - movedAlready; + if (needToUnload < 1 && needToLoad < 1) { + break; + } + if (needToUnload >= needToLoad) { + result.addAll(move(tooMany, tooLittle, needToLoad)); + end--; + movedAlready = 0; + } else { + result.addAll(move(tooMany, tooLittle, needToUnload)); + movedAlready += needToUnload; + } + if (needToUnload > needToLoad) + moreBalancingNeeded = true; + } + + } finally { + log.debug("balance ended with " + result.size() + " migrations"); + } + return moreBalancingNeeded; + } + + static class TableDiff { + int diff; + String table; + + public TableDiff(int diff, String table) { + this.diff = diff; + this.table = table; + } + }; + + /** + * Select a tablet based on differences between table loads; if the loads are even, use the busiest table + */ + List<TabletMigration> move(ServerCounts tooMuch, ServerCounts tooLittle, int count) { + + List<TabletMigration> result = new ArrayList<TabletMigration>(); + if (count == 0) + return result; + + Map<String,Map<KeyExtent,TabletStats>> onlineTablets = new HashMap<String,Map<KeyExtent,TabletStats>>(); + // Copy counts so we can update them as we propose migrations + Map<String,Integer> tooMuchMap = tabletCountsPerTable(tooMuch.status); + Map<String,Integer> tooLittleMap = tabletCountsPerTable(tooLittle.status); + + for (int i = 0; i < count; i++) { + String table; + Integer tooLittleCount; + if (tableToBalance == null) { + // find a table to migrate + // look for an uneven table count + int biggestDifference = 0; + String biggestDifferenceTable = null; + for (Entry<String,Integer> tableEntry : tooMuchMap.entrySet()) { + String tableID = tableEntry.getKey(); + if (tooLittleMap.get(tableID) == null) + tooLittleMap.put(tableID, 0); + int diff = tableEntry.getValue() - tooLittleMap.get(tableID); + if (diff > biggestDifference) { + biggestDifference = diff; + biggestDifferenceTable = tableID; + } + } + if (biggestDifference < 2) { + table = busiest(tooMuch.status.tableMap); + } else { + table = biggestDifferenceTable; + } + } else { + // just balance the given table + table = tableToBalance; + } + Map<KeyExtent,TabletStats> onlineTabletsForTable = onlineTablets.get(table); + try { + if (onlineTabletsForTable == null) { + onlineTabletsForTable = new HashMap<KeyExtent,TabletStats>(); + for (TabletStats stat : getOnlineTabletsForTable(tooMuch.server, table)) + onlineTabletsForTable.put(new KeyExtent(stat.extent), stat); + onlineTablets.put(table, onlineTabletsForTable); + } + } catch (Exception ex) { + log.error("Unable to select a tablet to move", ex); + return result; + } + KeyExtent extent = selectTablet(tooMuch.server, onlineTabletsForTable); + onlineTabletsForTable.remove(extent); + if (extent == null) + return result; + tooMuchMap.put(table, tooMuchMap.get(table) - 1); + /** + * If a table grows from 1 tablet then tooLittleMap.get(table) can return a null, since there is only one tabletserver that holds all of the tablets. Here + * we check to see if in fact that is the case and if so set the value to 0. + */ + tooLittleCount = tooLittleMap.get(table); + if (tooLittleCount == null) { + tooLittleCount = 0; + } + tooLittleMap.put(table, tooLittleCount + 1); + + result.add(new TabletMigration(extent, tooMuch.server, tooLittle.server)); + } + return result; + } + + static Map<String,Integer> tabletCountsPerTable(TabletServerStatus status) { + Map<String,Integer> result = new HashMap<String,Integer>(); + if (status != null && status.tableMap != null) { + Map<String,TableInfo> tableMap = status.tableMap; + for (Entry<String,TableInfo> entry : tableMap.entrySet()) { + result.put(entry.getKey(), entry.getValue().onlineTablets); + } + } + return result; + } + + static KeyExtent selectTablet(TServerInstance tserver, Map<KeyExtent,TabletStats> extents) { + if (extents.size() == 0) + return null; + KeyExtent mostRecentlySplit = null; + long splitTime = 0; + for (Entry<KeyExtent,TabletStats> entry : extents.entrySet()) + if (entry.getValue().splitCreationTime >= splitTime) { + splitTime = entry.getValue().splitCreationTime; + mostRecentlySplit = entry.getKey(); + } + return mostRecentlySplit; + } + + // define what it means for a tablet to be busy + private static String busiest(Map<String,TableInfo> tables) { + String result = null; + double busiest = Double.NEGATIVE_INFINITY; + for (Entry<String,TableInfo> entry : tables.entrySet()) { + TableInfo info = entry.getValue(); + double busy = info.ingestRate + info.queryRate; + if (busy > busiest) { + busiest = busy; + result = entry.getKey(); + } + } + return result; + } + + @Override + public void getAssignments(SortedMap<TServerInstance,TabletServerStatus> current, Map<KeyExtent,TServerInstance> unassigned, + Map<KeyExtent,TServerInstance> assignments) { + for (Entry<KeyExtent,TServerInstance> entry : unassigned.entrySet()) { + assignments.put(entry.getKey(), getAssignment(current, entry.getKey(), entry.getValue())); + } + } + + @Override + public long balance(SortedMap<TServerInstance,TabletServerStatus> current, Set<KeyExtent> migrations, List<TabletMigration> migrationsOut) { + // do we have any servers? + if (current.size() > 0) { + // Don't migrate if we have migrations in progress + if (migrations.size() == 0) { + if (getMigrations(current, migrationsOut)) + return 1 * 1000; + } + } + return 5 * 1000; + } + +}