Merge branch '1.4.5-SNAPSHOT' into 1.5.1-SNAPSHOT

Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/144d9d5b
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/144d9d5b
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/144d9d5b

Branch: refs/heads/1.5.1-SNAPSHOT
Commit: 144d9d5bfd3f249cdabfb8a7f5d60543ff6cc6c1
Parents: 3458bfa bec36bc
Author: Eric Newton <eric.new...@gmail.com>
Authored: Tue Dec 17 15:03:37 2013 -0500
Committer: Eric Newton <eric.new...@gmail.com>
Committed: Tue Dec 17 15:03:37 2013 -0500

----------------------------------------------------------------------
 .../accumulo/server/master/balancer/DefaultLoadBalancer.java      | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/144d9d5b/server/src/main/java/org/apache/accumulo/server/master/balancer/DefaultLoadBalancer.java
----------------------------------------------------------------------
diff --cc 
server/src/main/java/org/apache/accumulo/server/master/balancer/DefaultLoadBalancer.java
index 9b88d74,0000000..1fcab46
mode 100644,000000..100644
--- 
a/server/src/main/java/org/apache/accumulo/server/master/balancer/DefaultLoadBalancer.java
+++ 
b/server/src/main/java/org/apache/accumulo/server/master/balancer/DefaultLoadBalancer.java
@@@ -1,318 -1,0 +1,319 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.server.master.balancer;
 +
 +import java.util.ArrayList;
 +import java.util.Collections;
 +import java.util.HashMap;
 +import java.util.Iterator;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Map.Entry;
 +import java.util.Set;
 +import java.util.SortedMap;
 +
 +import org.apache.accumulo.core.data.KeyExtent;
 +import org.apache.accumulo.core.master.thrift.TableInfo;
 +import org.apache.accumulo.core.master.thrift.TabletServerStatus;
 +import org.apache.accumulo.core.tabletserver.thrift.TabletStats;
 +import org.apache.accumulo.server.master.state.TServerInstance;
 +import org.apache.accumulo.server.master.state.TabletMigration;
 +import org.apache.log4j.Logger;
 +
 +public class DefaultLoadBalancer extends TabletBalancer {
 +  
 +  private static final Logger log = 
Logger.getLogger(DefaultLoadBalancer.class);
 +  
 +  Iterator<TServerInstance> assignments;
 +  // if tableToBalance is set, then only balance the given table
 +  String tableToBalance = null;
 +  
 +  public DefaultLoadBalancer() {
 +    
 +  }
 +  
 +  public DefaultLoadBalancer(String table) {
 +    tableToBalance = table;
 +  }
 +  
 +  List<TServerInstance> randomize(Set<TServerInstance> locations) {
 +    List<TServerInstance> result = new ArrayList<TServerInstance>(locations);
 +    Collections.shuffle(result);
 +    return result;
 +  }
 +  
 +  public TServerInstance 
getAssignment(SortedMap<TServerInstance,TabletServerStatus> locations, 
KeyExtent extent, TServerInstance last) {
 +    if (locations.size() == 0)
 +      return null;
 +    
 +    if (last != null) {
 +      // Maintain locality
-       TServerInstance simple = new TServerInstance(last.getLocation(), "");
++      String fakeSessionID = " ";
++      TServerInstance simple = new TServerInstance(last.getLocation(), 
fakeSessionID);
 +      Iterator<TServerInstance> find = 
locations.tailMap(simple).keySet().iterator();
 +      if (find.hasNext()) {
 +        TServerInstance current = find.next();
 +        if (current.host().equals(last.host()))
 +          return current;
 +      }
 +    }
 +    
 +    // The strategy here is to walk through the locations and hand them back, 
one at a time
 +    // Grab an iterator off of the set of options; use a new iterator if it 
hands back something not in the current list.
 +    if (assignments == null || !assignments.hasNext())
 +      assignments = randomize(locations.keySet()).iterator();
 +    TServerInstance result = assignments.next();
 +    if (!locations.containsKey(result)) {
 +      assignments = null;
 +      return randomize(locations.keySet()).iterator().next();
 +    }
 +    return result;
 +  }
 +  
 +  static class ServerCounts implements Comparable<ServerCounts> {
 +    public final TServerInstance server;
 +    public final int count;
 +    public final TabletServerStatus status;
 +    
 +    ServerCounts(int count, TServerInstance server, TabletServerStatus 
status) {
 +      this.count = count;
 +      this.server = server;
 +      this.status = status;
 +    }
 +    
 +    public int compareTo(ServerCounts obj) {
 +      int result = count - obj.count;
 +      if (result == 0)
 +        return server.compareTo(obj.server);
 +      return result;
 +    }
 +  }
 +  
 +  public boolean getMigrations(Map<TServerInstance,TabletServerStatus> 
current, List<TabletMigration> result) {
 +    boolean moreBalancingNeeded = false;
 +    try {
 +      // no moves possible
 +      if (current.size() < 2) {
 +        return false;
 +      }
 +      
 +      // Sort by total number of online tablets, per server
 +      int total = 0;
 +      ArrayList<ServerCounts> totals = new ArrayList<ServerCounts>();
 +      for (Entry<TServerInstance,TabletServerStatus> entry : 
current.entrySet()) {
 +        int serverTotal = 0;
 +        if (entry.getValue() != null && entry.getValue().tableMap != null) {
 +          for (Entry<String,TableInfo> e : 
entry.getValue().tableMap.entrySet()) {
 +            /**
 +             * The check below was on entry.getKey(), but that resolves to a 
tabletserver not a tablename. Believe it should be e.getKey() which is a 
tablename
 +             */
 +            if (tableToBalance == null || tableToBalance.equals(e.getKey()))
 +              serverTotal += e.getValue().onlineTablets;
 +          }
 +        }
 +        totals.add(new ServerCounts(serverTotal, entry.getKey(), 
entry.getValue()));
 +        total += serverTotal;
 +      }
 +      
 +      // order from low to high
 +      Collections.sort(totals);
 +      Collections.reverse(totals);
 +      int even = total / totals.size();
 +      int numServersOverEven = total % totals.size();
 +      
 +      // Move tablets from the servers with too many to the servers with
 +      // the fewest but only nominate tablets to move once. This allows us
 +      // to fill new servers with tablets from a mostly balanced server
 +      // very quickly. However, it may take several balancing passes to move
 +      // tablets from one hugely overloaded server to many slightly
 +      // under-loaded servers.
 +      int end = totals.size() - 1;
 +      int movedAlready = 0;
 +      for (int tooManyIndex = 0; tooManyIndex < totals.size(); 
tooManyIndex++) {
 +        ServerCounts tooMany = totals.get(tooManyIndex);
 +        int goal = even;
 +        if (tooManyIndex < numServersOverEven) {
 +          goal++;
 +        }
 +        int needToUnload = tooMany.count - goal;
 +        ServerCounts tooLittle = totals.get(end);
 +        int needToLoad = goal - tooLittle.count - movedAlready;
 +        if (needToUnload < 1 && needToLoad < 1) {
 +          break;
 +        }
 +        if (needToUnload >= needToLoad) {
 +          result.addAll(move(tooMany, tooLittle, needToLoad));
 +          end--;
 +          movedAlready = 0;
 +        } else {
 +          result.addAll(move(tooMany, tooLittle, needToUnload));
 +          movedAlready += needToUnload;
 +        }
 +        if (needToUnload > needToLoad)
 +          moreBalancingNeeded = true;
 +      }
 +      
 +    } finally {
 +      log.debug("balance ended with " + result.size() + " migrations");
 +    }
 +    return moreBalancingNeeded;
 +  }
 +  
 +  static class TableDiff {
 +    int diff;
 +    String table;
 +    
 +    public TableDiff(int diff, String table) {
 +      this.diff = diff;
 +      this.table = table;
 +    }
 +  };
 +  
 +  /**
 +   * Select a tablet based on differences between table loads; if the loads 
are even, use the busiest table
 +   */
 +  List<TabletMigration> move(ServerCounts tooMuch, ServerCounts tooLittle, 
int count) {
 +    
 +    List<TabletMigration> result = new ArrayList<TabletMigration>();
 +    if (count == 0)
 +      return result;
 +    
 +    Map<String,Map<KeyExtent,TabletStats>> onlineTablets = new 
HashMap<String,Map<KeyExtent,TabletStats>>();
 +    // Copy counts so we can update them as we propose migrations
 +    Map<String,Integer> tooMuchMap = tabletCountsPerTable(tooMuch.status);
 +    Map<String,Integer> tooLittleMap = tabletCountsPerTable(tooLittle.status);
 +    
 +    for (int i = 0; i < count; i++) {
 +      String table;
 +      Integer tooLittleCount;
 +      if (tableToBalance == null) {
 +        // find a table to migrate
 +        // look for an uneven table count
 +        int biggestDifference = 0;
 +        String biggestDifferenceTable = null;
 +        for (Entry<String,Integer> tableEntry : tooMuchMap.entrySet()) {
 +          String tableID = tableEntry.getKey();
 +          if (tooLittleMap.get(tableID) == null)
 +            tooLittleMap.put(tableID, 0);
 +          int diff = tableEntry.getValue() - tooLittleMap.get(tableID);
 +          if (diff > biggestDifference) {
 +            biggestDifference = diff;
 +            biggestDifferenceTable = tableID;
 +          }
 +        }
 +        if (biggestDifference < 2) {
 +          table = busiest(tooMuch.status.tableMap);
 +        } else {
 +          table = biggestDifferenceTable;
 +        }
 +      } else {
 +        // just balance the given table
 +        table = tableToBalance;
 +      }
 +      Map<KeyExtent,TabletStats> onlineTabletsForTable = 
onlineTablets.get(table);
 +      try {
 +        if (onlineTabletsForTable == null) {
 +          onlineTabletsForTable = new HashMap<KeyExtent,TabletStats>();
 +          for (TabletStats stat : getOnlineTabletsForTable(tooMuch.server, 
table))
 +            onlineTabletsForTable.put(new KeyExtent(stat.extent), stat);
 +          onlineTablets.put(table, onlineTabletsForTable);
 +        }
 +      } catch (Exception ex) {
 +        log.error("Unable to select a tablet to move", ex);
 +        return result;
 +      }
 +      KeyExtent extent = selectTablet(tooMuch.server, onlineTabletsForTable);
 +      onlineTabletsForTable.remove(extent);
 +      if (extent == null)
 +        return result;
 +      tooMuchMap.put(table, tooMuchMap.get(table) - 1);
 +      /**
 +       * If a table grows from 1 tablet then tooLittleMap.get(table) can 
return a null, since there is only one tabletserver that holds all of the 
tablets. Here
 +       * we check to see if in fact that is the case and if so set the value 
to 0.
 +       */
 +      tooLittleCount = tooLittleMap.get(table);
 +      if (tooLittleCount == null) {
 +        tooLittleCount = 0;
 +      }
 +      tooLittleMap.put(table, tooLittleCount + 1);
 +      
 +      result.add(new TabletMigration(extent, tooMuch.server, 
tooLittle.server));
 +    }
 +    return result;
 +  }
 +  
 +  static Map<String,Integer> tabletCountsPerTable(TabletServerStatus status) {
 +    Map<String,Integer> result = new HashMap<String,Integer>();
 +    if (status != null && status.tableMap != null) {
 +      Map<String,TableInfo> tableMap = status.tableMap;
 +      for (Entry<String,TableInfo> entry : tableMap.entrySet()) {
 +        result.put(entry.getKey(), entry.getValue().onlineTablets);
 +      }
 +    }
 +    return result;
 +  }
 +  
 +  static KeyExtent selectTablet(TServerInstance tserver, 
Map<KeyExtent,TabletStats> extents) {
 +    if (extents.size() == 0)
 +      return null;
 +    KeyExtent mostRecentlySplit = null;
 +    long splitTime = 0;
 +    for (Entry<KeyExtent,TabletStats> entry : extents.entrySet())
 +      if (entry.getValue().splitCreationTime >= splitTime) {
 +        splitTime = entry.getValue().splitCreationTime;
 +        mostRecentlySplit = entry.getKey();
 +      }
 +    return mostRecentlySplit;
 +  }
 +  
 +  // define what it means for a tablet to be busy
 +  private static String busiest(Map<String,TableInfo> tables) {
 +    String result = null;
 +    double busiest = Double.NEGATIVE_INFINITY;
 +    for (Entry<String,TableInfo> entry : tables.entrySet()) {
 +      TableInfo info = entry.getValue();
 +      double busy = info.ingestRate + info.queryRate;
 +      if (busy > busiest) {
 +        busiest = busy;
 +        result = entry.getKey();
 +      }
 +    }
 +    return result;
 +  }
 +  
 +  @Override
 +  public void getAssignments(SortedMap<TServerInstance,TabletServerStatus> 
current, Map<KeyExtent,TServerInstance> unassigned,
 +      Map<KeyExtent,TServerInstance> assignments) {
 +    for (Entry<KeyExtent,TServerInstance> entry : unassigned.entrySet()) {
 +      assignments.put(entry.getKey(), getAssignment(current, entry.getKey(), 
entry.getValue()));
 +    }
 +  }
 +  
 +  @Override
 +  public long balance(SortedMap<TServerInstance,TabletServerStatus> current, 
Set<KeyExtent> migrations, List<TabletMigration> migrationsOut) {
 +    // do we have any servers?
 +    if (current.size() > 0) {
 +      // Don't migrate if we have migrations in progress
 +      if (migrations.size() == 0) {
 +        if (getMigrations(current, migrationsOut))
 +          return 1 * 1000;
 +      }
 +    }
 +    return 5 * 1000;
 +  }
 +  
 +}

Reply via email to