Repository: accumulo Updated Branches: refs/heads/master 9952957c8 -> 51fbfaf0a
ACCUMULO-3439 Added RegexGroupBalancer Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/51fbfaf0 Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/51fbfaf0 Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/51fbfaf0 Branch: refs/heads/master Commit: 51fbfaf0a52dc89e8294c86c30164fb94c9f644c Parents: 9952957 Author: Keith Turner <ktur...@apache.org> Authored: Mon Jan 12 14:51:26 2015 -0500 Committer: Keith Turner <ktur...@apache.org> Committed: Mon Jan 12 14:51:26 2015 -0500 ---------------------------------------------------------------------- .../main/resources/examples/README.rgbalancer | 159 +++++ .../server/master/balancer/GroupBalancer.java | 707 +++++++++++++++++++ .../master/balancer/RegexGroupBalancer.java | 96 +++ .../master/balancer/TableLoadBalancer.java | 2 + .../master/balancer/GroupBalancerTest.java | 285 ++++++++ .../test/functional/RegexGroupBalanceIT.java | 192 +++++ 6 files changed, 1441 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/51fbfaf0/docs/src/main/resources/examples/README.rgbalancer ---------------------------------------------------------------------- diff --git a/docs/src/main/resources/examples/README.rgbalancer b/docs/src/main/resources/examples/README.rgbalancer new file mode 100644 index 0000000..f192a93 --- /dev/null +++ b/docs/src/main/resources/examples/README.rgbalancer @@ -0,0 +1,159 @@ +Title: Apache Accumulo Hello World Example +Notice: Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + . + http://www.apache.org/licenses/LICENSE-2.0 + . + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +For some data access patterns, its important to spread groups of tablets within +a table out evenly. Accumulo has a balancer that can do this using a regular +expression to group tablets. This example shows how this balancer spreads 4 +groups of tablets within a table evenly across 17 tablet servers. + +Below shows creating a table and adding splits. For this example we would like +all of the tablets where the split point has the same two digits to be on +different tservers. This gives us four groups of tablets: 01, 02, 03, and 04. + + root@accumulo> createtable testRGB + root@accumulo testRGB> addsplits -t testRGB 01b 01m 01r 01z 02b 02m 02r 02z 03b 03m 03r 03z 04a 04b 04c 04d 04e 04f 04g 04h 04i 04j 04k 04l 04m 04n 04o 04p + root@accumulo testRGB> tables -l + accumulo.metadata => !0 + accumulo.replication => +rep + accumulo.root => +r + testRGB => 2 + trace => 1 + +After adding the splits we look at the locations in the metadata table. + + root@accumulo testRGB> scan -t accumulo.metadata -b 2; -e 2< -c loc + 2;01b loc:34a5f6e086b000c [] ip-10-1-2-25:9997 + 2;01m loc:34a5f6e086b000c [] ip-10-1-2-25:9997 + 2;01r loc:14a5f6e079d0011 [] ip-10-1-2-15:9997 + 2;01z loc:14a5f6e079d000f [] ip-10-1-2-13:9997 + 2;02b loc:34a5f6e086b000b [] ip-10-1-2-26:9997 + 2;02m loc:14a5f6e079d000c [] ip-10-1-2-28:9997 + 2;02r loc:14a5f6e079d0012 [] ip-10-1-2-27:9997 + 2;02z loc:14a5f6e079d0012 [] ip-10-1-2-27:9997 + 2;03b loc:14a5f6e079d000d [] ip-10-1-2-21:9997 + 2;03m loc:14a5f6e079d000e [] ip-10-1-2-20:9997 + 2;03r loc:14a5f6e079d000d [] ip-10-1-2-21:9997 + 2;03z loc:14a5f6e079d000e [] ip-10-1-2-20:9997 + 2;04a loc:34a5f6e086b000b [] ip-10-1-2-26:9997 + 2;04b loc:14a5f6e079d0010 [] ip-10-1-2-17:9997 + 2;04c loc:14a5f6e079d0010 [] ip-10-1-2-17:9997 + 2;04d loc:24a5f6e07d3000c [] ip-10-1-2-16:9997 + 2;04e loc:24a5f6e07d3000d [] ip-10-1-2-29:9997 + 2;04f loc:24a5f6e07d3000c [] ip-10-1-2-16:9997 + 2;04g loc:24a5f6e07d3000a [] ip-10-1-2-14:9997 + 2;04h loc:14a5f6e079d000c [] ip-10-1-2-28:9997 + 2;04i loc:34a5f6e086b000d [] ip-10-1-2-19:9997 + 2;04j loc:34a5f6e086b000d [] ip-10-1-2-19:9997 + 2;04k loc:24a5f6e07d30009 [] ip-10-1-2-23:9997 + 2;04l loc:24a5f6e07d3000b [] ip-10-1-2-22:9997 + 2;04m loc:24a5f6e07d30009 [] ip-10-1-2-23:9997 + 2;04n loc:24a5f6e07d3000b [] ip-10-1-2-22:9997 + 2;04o loc:34a5f6e086b000a [] ip-10-1-2-18:9997 + 2;04p loc:24a5f6e07d30008 [] ip-10-1-2-24:9997 + 2< loc:24a5f6e07d30008 [] ip-10-1-2-24:9997 + +Below the information above was massaged to show which tablet groups are on +each tserver. The four tablets in group 03 are on two tservers, ideally those +tablets would be spread across 4 tservers. Note the default tablet (2<) was +categorized as group 04 below. + + ip-10-1-2-13:9997 01 + ip-10-1-2-14:9997 04 + ip-10-1-2-15:9997 01 + ip-10-1-2-16:9997 04 04 + ip-10-1-2-17:9997 04 04 + ip-10-1-2-18:9997 04 + ip-10-1-2-19:9997 04 04 + ip-10-1-2-20:9997 03 03 + ip-10-1-2-21:9997 03 03 + ip-10-1-2-22:9997 04 04 + ip-10-1-2-23:9997 04 04 + ip-10-1-2-24:9997 04 04 + ip-10-1-2-25:9997 01 01 + ip-10-1-2-26:9997 02 04 + ip-10-1-2-27:9997 02 02 + ip-10-1-2-28:9997 02 04 + ip-10-1-2-29:9997 04 + +To remedy this situation, the RegexGroupBalancer is configured with the +commands below. The configured regular expression selects the first two digits +from a tablets end row as the group id. Tablets that don't match and the +default tablet are configured to be in group 04. + + root@accumulo testRGB> config -t testRGB -s table.custom.balancer.group.regex.pattern=(\\d\\d).* + root@accumulo testRGB> config -t testRGB -s table.custom.balancer.group.regex.default=04 + root@accumulo testRGB> config -t testRGB -s table.balancer=org.apache.accumulo.server.master.balancer.RegexGroupBalancer + +After waiting a little bit, look at the tablet locations again and all is good. + + root@accumulo testRGB> scan -t accumulo.metadata -b 2; -e 2< -c loc + 2;01b loc:34a5f6e086b000a [] ip-10-1-2-18:9997 + 2;01m loc:34a5f6e086b000c [] ip-10-1-2-25:9997 + 2;01r loc:14a5f6e079d0011 [] ip-10-1-2-15:9997 + 2;01z loc:14a5f6e079d000f [] ip-10-1-2-13:9997 + 2;02b loc:34a5f6e086b000b [] ip-10-1-2-26:9997 + 2;02m loc:14a5f6e079d000c [] ip-10-1-2-28:9997 + 2;02r loc:34a5f6e086b000d [] ip-10-1-2-19:9997 + 2;02z loc:14a5f6e079d0012 [] ip-10-1-2-27:9997 + 2;03b loc:24a5f6e07d3000d [] ip-10-1-2-29:9997 + 2;03m loc:24a5f6e07d30009 [] ip-10-1-2-23:9997 + 2;03r loc:14a5f6e079d000d [] ip-10-1-2-21:9997 + 2;03z loc:14a5f6e079d000e [] ip-10-1-2-20:9997 + 2;04a loc:34a5f6e086b000b [] ip-10-1-2-26:9997 + 2;04b loc:34a5f6e086b000c [] ip-10-1-2-25:9997 + 2;04c loc:14a5f6e079d0010 [] ip-10-1-2-17:9997 + 2;04d loc:14a5f6e079d000e [] ip-10-1-2-20:9997 + 2;04e loc:24a5f6e07d3000d [] ip-10-1-2-29:9997 + 2;04f loc:24a5f6e07d3000c [] ip-10-1-2-16:9997 + 2;04g loc:24a5f6e07d3000a [] ip-10-1-2-14:9997 + 2;04h loc:14a5f6e079d000c [] ip-10-1-2-28:9997 + 2;04i loc:14a5f6e079d0011 [] ip-10-1-2-15:9997 + 2;04j loc:34a5f6e086b000d [] ip-10-1-2-19:9997 + 2;04k loc:14a5f6e079d0012 [] ip-10-1-2-27:9997 + 2;04l loc:14a5f6e079d000f [] ip-10-1-2-13:9997 + 2;04m loc:24a5f6e07d30009 [] ip-10-1-2-23:9997 + 2;04n loc:24a5f6e07d3000b [] ip-10-1-2-22:9997 + 2;04o loc:34a5f6e086b000a [] ip-10-1-2-18:9997 + 2;04p loc:14a5f6e079d000d [] ip-10-1-2-21:9997 + 2< loc:24a5f6e07d30008 [] ip-10-1-2-24:9997 + +Once again, the data above is transformed to make it easier to see which groups +are on tservers. The transformed data below shows that all groups are now +evenly spread. + + ip-10-1-2-13:9997 01 04 + ip-10-1-2-14:9997 04 + ip-10-1-2-15:9997 01 04 + ip-10-1-2-16:9997 04 + ip-10-1-2-17:9997 04 + ip-10-1-2-18:9997 01 04 + ip-10-1-2-19:9997 02 04 + ip-10-1-2-20:9997 03 04 + ip-10-1-2-21:9997 03 04 + ip-10-1-2-22:9997 04 + ip-10-1-2-23:9997 03 04 + ip-10-1-2-24:9997 04 + ip-10-1-2-25:9997 01 04 + ip-10-1-2-26:9997 02 04 + ip-10-1-2-27:9997 02 04 + ip-10-1-2-28:9997 02 04 + ip-10-1-2-29:9997 03 04 + +If you need this functionality, but a regular expression does not meet your +needs then extend GroupBalancer. This allows you to specify a partitioning +function in Java. Use the RegexGroupBalancer source as an example. http://git-wip-us.apache.org/repos/asf/accumulo/blob/51fbfaf0/server/base/src/main/java/org/apache/accumulo/server/master/balancer/GroupBalancer.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/balancer/GroupBalancer.java b/server/base/src/main/java/org/apache/accumulo/server/master/balancer/GroupBalancer.java new file mode 100644 index 0000000..8feeb81 --- /dev/null +++ b/server/base/src/main/java/org/apache/accumulo/server/master/balancer/GroupBalancer.java @@ -0,0 +1,707 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.accumulo.server.master.balancer; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.SortedMap; + +import org.apache.accumulo.core.client.IsolatedScanner; +import org.apache.accumulo.core.client.RowIterator; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.KeyExtent; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.master.thrift.TabletServerStatus; +import org.apache.accumulo.core.metadata.MetadataTable; +import org.apache.accumulo.core.metadata.schema.MetadataSchema; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.util.ComparablePair; +import org.apache.accumulo.core.util.MapCounter; +import org.apache.accumulo.core.util.Pair; +import org.apache.accumulo.server.master.state.TServerInstance; +import org.apache.accumulo.server.master.state.TabletMigration; +import org.apache.commons.lang.mutable.MutableInt; + +import com.google.common.base.Function; +import com.google.common.base.Preconditions; +import com.google.common.collect.HashBasedTable; +import com.google.common.collect.HashMultimap; +import com.google.common.collect.Iterators; +import com.google.common.collect.Multimap; +import com.google.common.collect.Table; + +/** + * A balancer that evenly spreads groups of tablets across all tablet server. This balancer accomplishes the following two goals : + * + * <ul> + * <li/>Evenly spreads each group across all tservers. + * <li/>Minimizes the total number of groups on each tserver. + * </ul> + * + * <p> + * To use this balancer you must extend it and implement {@link #getPartitioner()}. See {@link RegexGroupBalancer} as an example. + */ + +public abstract class GroupBalancer extends TabletBalancer { + + private final String tableId; + private long lastRun = 0; + + /** + * @return A function that groups tablets into named groups. + */ + protected abstract Function<KeyExtent,String> getPartitioner(); + + public GroupBalancer(String tableId) { + this.tableId = tableId; + } + + protected Iterable<Pair<KeyExtent,Location>> getLocationProvider() { + return new MetadataLocationProvider(); + } + + /** + * The amount of time to wait between balancing. + */ + protected long getWaitTime() { + return 60000; + } + + @Override + public void getAssignments(SortedMap<TServerInstance,TabletServerStatus> current, Map<KeyExtent,TServerInstance> unassigned, + Map<KeyExtent,TServerInstance> assignments) { + + if (current.size() == 0) { + return; + } + + Function<KeyExtent,String> partitioner = getPartitioner(); + + List<ComparablePair<String,KeyExtent>> tabletsByGroup = new ArrayList<>(); + for (Entry<KeyExtent,TServerInstance> entry : unassigned.entrySet()) { + TServerInstance last = entry.getValue(); + if (last != null) { + // Maintain locality + String fakeSessionID = " "; + TServerInstance simple = new TServerInstance(last.getLocation(), fakeSessionID); + Iterator<TServerInstance> find = current.tailMap(simple).keySet().iterator(); + if (find.hasNext()) { + TServerInstance tserver = find.next(); + if (tserver.host().equals(last.host())) { + assignments.put(entry.getKey(), tserver); + continue; + } + } + } + + tabletsByGroup.add(new ComparablePair<String,KeyExtent>(partitioner.apply(entry.getKey()), entry.getKey())); + } + + Collections.sort(tabletsByGroup); + + Iterator<TServerInstance> tserverIter = Iterators.cycle(current.keySet()); + for (ComparablePair<String,KeyExtent> pair : tabletsByGroup) { + KeyExtent ke = pair.getSecond(); + assignments.put(ke, tserverIter.next()); + } + + } + + @Override + public long balance(SortedMap<TServerInstance,TabletServerStatus> current, Set<KeyExtent> migrations, List<TabletMigration> migrationsOut) { + + // The terminology extra and expected are used in this code. Expected tablets is the number of tablets a tserver must have for a given group and is + // numInGroup/numTservers. Extra tablets are any tablets more than the number expected for a given group. If numInGroup % numTservers > 0, then a tserver + // may have one extra tablet for a group. + // + // Assume we have 4 tservers and group A has 11 tablets. + // * expected tablets : group A is expected to have 2 tablets on each tservers + // * extra tablets : group A may have an additional tablet on each tserver. Group A has a total of 3 extra tablets. + // + // This balancer also evens out the extra tablets across all groups. The terminology extraExpected and extraExtra is used to describe these tablets. + // ExtraExpected is totalExtra/numTservers. ExtraExtra is totalExtra%numTservers. Each tserver should have at least expectedExtra extra tablets and at most + // one extraExtra tablets. All extra tablets on a tserver must be from different groups. + // + // Assume we have 6 tservers and three groups (G1, G2, G3) with 9 tablets each. Each tserver is expected to have one tablet from each group and could + // possibly have 2 tablets from a group. Below is an illustration of an ideal balancing of extra tablets. To understand the illustration, the first column + // shows tserver T1 with 2 tablets from G1, 1 tablet from G2, and two tablets from G3. EE means empty, put it there so eclipse formating would not mess up + // table. + // + // T1 | T2 | T3 | T4 | T5 | T6 + // ---+----+----+----+----+----- + // G3 | G2 | G3 | EE | EE | EE <-- extra extra tablets + // G1 | G1 | G1 | G2 | G3 | G2 <-- extra expected tablets. + // G1 | G1 | G1 | G1 | G1 | G1 <-- expected tablets for group 1 + // G2 | G2 | G2 | G2 | G2 | G2 <-- expected tablets for group 2 + // G3 | G3 | G3 | G3 | G3 | G3 <-- expected tablets for group 3 + // + // Do not want to balance the extra tablets like the following. There are two problem with this. First extra tablets are not evenly spread. Since there are + // a total of 9 extra tablets, every tserver is expected to have at least one extra tablet. Second tserver T1 has two extra tablet for group G1. This + // violates the principal that a tserver can only have one extra tablet for a given group. + // + // T1 | T2 | T3 | T4 | T5 | T6 + // ---+----+----+----+----+----- + // G1 | EE | EE | EE | EE | EE <--- one extra tablets from group 1 + // G3 | G3 | G3 | EE | EE | EE <--- three extra tablets from group 3 + // G2 | G2 | G2 | EE | EE | EE <--- three extra tablets from group 2 + // G1 | G1 | EE | EE | EE | EE <--- two extra tablets from group 1 + // G1 | G1 | G1 | G1 | G1 | G1 <-- expected tablets for group 1 + // G2 | G2 | G2 | G2 | G2 | G2 <-- expected tablets for group 2 + // G3 | G3 | G3 | G3 | G3 | G3 <-- expected tablets for group 3 + + if (migrations.size() > 0 || current.size() < 2) { + return 5000; + } + + if (System.currentTimeMillis() - lastRun < getWaitTime()) { + return 5000; + } + + lastRun = System.currentTimeMillis(); + + MapCounter<String> groupCounts = new MapCounter<>(); + Map<TServerInstance,TserverGroupInfo> tservers = new HashMap<>(); + + for (TServerInstance tsi : current.keySet()) { + tservers.put(tsi, new TserverGroupInfo(tsi)); + } + + Function<KeyExtent,String> partitioner = getPartitioner(); + + // collect stats about current state + for (Pair<KeyExtent,Location> entry : getLocationProvider()) { + String group = partitioner.apply(entry.getFirst()); + Location loc = entry.getSecond(); + + if (loc.equals(Location.NONE) || !current.containsKey(loc.getTserverInstance())) { + return 5000; + } + + groupCounts.increment(group, 1); + TserverGroupInfo tgi = tservers.get(loc.getTserverInstance()); + tgi.addGroup(group); + } + + Map<String,Integer> expectedCounts = new HashMap<>(); + + int totalExtra = 0; + for (String group : groupCounts.keySet()) { + long groupCount = groupCounts.get(group); + totalExtra += groupCount % current.size(); + expectedCounts.put(group, (int) (groupCount / current.size())); + } + + // The number of extra tablets from all groups that each tserver must have. + int expectedExtra = totalExtra / current.size(); + int maxExtraGroups = expectedExtra + 1; + + expectedCounts = Collections.unmodifiableMap(expectedCounts); + tservers = Collections.unmodifiableMap(tservers); + + for (TserverGroupInfo tgi : tservers.values()) { + tgi.finishedAdding(expectedCounts); + } + + Moves moves = new Moves(); + + balanceExpected(tservers, moves); + balanceExtraExpected(tservers, expectedExtra, moves); + balanceExtraMultiple(tservers, maxExtraGroups, moves); + balanceExtraExtra(tservers, maxExtraGroups, moves); + + populateMigrations(current, migrationsOut, moves); + + return 5000; + } + + public static class Location { + public static final Location NONE = new Location(); + private final TServerInstance tserverInstance; + + public Location() { + this(null); + } + + public Location(TServerInstance tsi) { + tserverInstance = tsi; + } + + public TServerInstance getTserverInstance() { + return tserverInstance; + } + + @Override + public boolean equals(Object o) { + if (o instanceof Location) { + Location ol = ((Location) o); + if (tserverInstance == ol.tserverInstance) { + return true; + } + return tserverInstance.equals(ol.tserverInstance); + } + return false; + } + } + + private static class TserverGroupInfo { + + private Map<String,Integer> expectedCounts; + private final Map<String,MutableInt> initialCounts = new HashMap<>(); + private final Map<String,Integer> extraCounts = new HashMap<>(); + private final Map<String,Integer> expectedDeficits = new HashMap<>(); + + private final TServerInstance tsi; + private boolean finishedAdding = false; + + TserverGroupInfo(TServerInstance tsi) { + this.tsi = tsi; + } + + public void addGroup(String group) { + Preconditions.checkState(!finishedAdding); + + MutableInt mi = initialCounts.get(group); + if (mi == null) { + mi = new MutableInt(); + initialCounts.put(group, mi); + } + + mi.increment(); + } + + public void finishedAdding(Map<String,Integer> expectedCounts) { + Preconditions.checkState(!finishedAdding); + finishedAdding = true; + this.expectedCounts = expectedCounts; + + for (Entry<String,Integer> entry : expectedCounts.entrySet()) { + String group = entry.getKey(); + int expected = entry.getValue(); + + MutableInt count = initialCounts.get(group); + int num = count == null ? 0 : count.intValue(); + + if (num < expected) { + expectedDeficits.put(group, expected - num); + } else if (num > expected) { + extraCounts.put(group, num - expected); + } + } + + } + + public void moveOff(String group, int num) { + Preconditions.checkArgument(num > 0); + Preconditions.checkState(finishedAdding); + + Integer extraCount = extraCounts.get(group); + + Preconditions.checkArgument(extraCount != null && extraCount >= num, "group=%s num=%s extraCount=%s", group, num, extraCount); + + MutableInt initialCount = initialCounts.get(group); + + Preconditions.checkArgument(initialCount.intValue() >= num); + + initialCount.subtract(num); + + if (extraCount - num == 0) { + extraCounts.remove(group); + } else { + extraCounts.put(group, extraCount - num); + } + } + + public void moveTo(String group, int num) { + Preconditions.checkArgument(num > 0); + Preconditions.checkArgument(expectedCounts.containsKey(group)); + Preconditions.checkState(finishedAdding); + + Integer deficit = expectedDeficits.get(group); + if (deficit != null) { + if (num >= deficit) { + expectedDeficits.remove(group); + num -= deficit; + } else { + expectedDeficits.put(group, deficit - num); + num = 0; + } + } + + if (num > 0) { + Integer extra = extraCounts.get(group); + if (extra == null) { + extra = 0; + } + + extraCounts.put(group, extra + num); + } + + // TODO could check extra constraints + } + + public Map<String,Integer> getExpectedDeficits() { + Preconditions.checkState(finishedAdding); + return Collections.unmodifiableMap(expectedDeficits); + } + + public Map<String,Integer> getExtras() { + Preconditions.checkState(finishedAdding); + return Collections.unmodifiableMap(extraCounts); + } + + public TServerInstance getTserverInstance() { + return tsi; + } + + @Override + public int hashCode() { + return tsi.hashCode(); + } + + @Override + public boolean equals(Object o) { + if (o instanceof TserverGroupInfo) { + TserverGroupInfo otgi = (TserverGroupInfo) o; + return tsi.equals(otgi.tsi); + } + + return false; + } + + @Override + public String toString() { + return tsi.toString(); + } + + } + + private static class Move { + TserverGroupInfo dest; + int count; + + public Move(TserverGroupInfo dest, int num) { + this.dest = dest; + this.count = num; + } + } + + private static class Moves { + + private final Table<TServerInstance,String,List<Move>> moves = HashBasedTable.create(); + + public void move(String group, int num, TserverGroupInfo src, TserverGroupInfo dest) { + Preconditions.checkArgument(num > 0); + Preconditions.checkArgument(!src.equals(dest)); + + src.moveOff(group, num); + dest.moveTo(group, num); + + List<Move> srcMoves = moves.get(src.getTserverInstance(), group); + if (srcMoves == null) { + srcMoves = new ArrayList<>(); + moves.put(src.getTserverInstance(), group, srcMoves); + } + + srcMoves.add(new Move(dest, num)); + } + + public TServerInstance removeMove(TServerInstance src, String group) { + List<Move> srcMoves = moves.get(src, group); + if (srcMoves == null) { + return null; + } + + Move move = srcMoves.get(srcMoves.size() - 1); + TServerInstance ret = move.dest.getTserverInstance(); + + move.count--; + if (move.count == 0) { + srcMoves.remove(srcMoves.size() - 1); + if (srcMoves.size() == 0) { + moves.remove(src, group); + } + } + + return ret; + } + } + + private void balanceExtraExtra(Map<TServerInstance,TserverGroupInfo> tservers, int maxExtraGroups, Moves moves) { + Table<String,TServerInstance,TserverGroupInfo> surplusExtra = HashBasedTable.create(); + for (TserverGroupInfo tgi : tservers.values()) { + Map<String,Integer> extras = tgi.getExtras(); + if (extras.size() > maxExtraGroups) { + for (String group : extras.keySet()) { + surplusExtra.put(group, tgi.getTserverInstance(), tgi); + } + } + } + + ArrayList<Pair<String,TServerInstance>> serversGroupsToRemove = new ArrayList<>(); + ArrayList<TServerInstance> serversToRemove = new ArrayList<>(); + + for (TserverGroupInfo destTgi : tservers.values()) { + if (surplusExtra.size() == 0) { + break; + } + + Map<String,Integer> extras = destTgi.getExtras(); + if (extras.size() < maxExtraGroups) { + serversToRemove.clear(); + serversGroupsToRemove.clear(); + for (String group : surplusExtra.rowKeySet()) { + if (!extras.containsKey(group)) { + TserverGroupInfo srcTgi = surplusExtra.row(group).values().iterator().next(); + + moves.move(group, 1, srcTgi, destTgi); + + if (srcTgi.getExtras().size() <= maxExtraGroups) { + serversToRemove.add(srcTgi.getTserverInstance()); + } else { + serversGroupsToRemove.add(new Pair<String,TServerInstance>(group, srcTgi.getTserverInstance())); + } + + if (destTgi.getExtras().size() >= maxExtraGroups) { + break; + } + } + } + + surplusExtra.columnKeySet().removeAll(serversToRemove); + for (Pair<String,TServerInstance> pair : serversGroupsToRemove) { + surplusExtra.remove(pair.getFirst(), pair.getSecond()); + } + } + } + } + + private void balanceExtraMultiple(Map<TServerInstance,TserverGroupInfo> tservers, int maxExtraGroups, Moves moves) { + Multimap<String,TserverGroupInfo> extraMultiple = HashMultimap.create(); + + for (TserverGroupInfo tgi : tservers.values()) { + Map<String,Integer> extras = tgi.getExtras(); + for (Entry<String,Integer> entry : extras.entrySet()) { + if (entry.getValue() > 1) { + extraMultiple.put(entry.getKey(), tgi); + } + } + } + + ArrayList<Pair<String,TserverGroupInfo>> serversToRemove = new ArrayList<>(); + for (TserverGroupInfo destTgi : tservers.values()) { + Map<String,Integer> extras = destTgi.getExtras(); + if (extras.size() < maxExtraGroups) { + serversToRemove.clear(); + for (String group : extraMultiple.keySet()) { + if (!extras.containsKey(group)) { + Collection<TserverGroupInfo> sources = extraMultiple.get(group); + Iterator<TserverGroupInfo> iter = sources.iterator(); + TserverGroupInfo srcTgi = iter.next(); + + int num = srcTgi.getExtras().get(group); + + moves.move(group, 1, srcTgi, destTgi); + + if (num == 2) { + serversToRemove.add(new Pair<String,TserverGroupInfo>(group, srcTgi)); + } + + if (destTgi.getExtras().size() >= maxExtraGroups) { + break; + } + } + } + + for (Pair<String,TserverGroupInfo> pair : serversToRemove) { + extraMultiple.remove(pair.getFirst(), pair.getSecond()); + } + + if (extraMultiple.size() == 0) { + break; + } + } + } + } + + private void balanceExtraExpected(Map<TServerInstance,TserverGroupInfo> tservers, int expectedExtra, Moves moves) { + + Table<String,TServerInstance,TserverGroupInfo> extraSurplus = HashBasedTable.create(); + + for (TserverGroupInfo tgi : tservers.values()) { + Map<String,Integer> extras = tgi.getExtras(); + if (extras.size() > expectedExtra) { + for (String group : extras.keySet()) { + extraSurplus.put(group, tgi.getTserverInstance(), tgi); + } + } + } + + ArrayList<TServerInstance> emptyServers = new ArrayList<>(); + ArrayList<Pair<String,TServerInstance>> emptyServerGroups = new ArrayList<>(); + for (TserverGroupInfo destTgi : tservers.values()) { + if (extraSurplus.size() == 0) { + break; + } + + Map<String,Integer> extras = destTgi.getExtras(); + if (extras.size() < expectedExtra) { + emptyServers.clear(); + emptyServerGroups.clear(); + nextGroup: for (String group : extraSurplus.rowKeySet()) { + if (!extras.containsKey(group)) { + Iterator<TserverGroupInfo> iter = extraSurplus.row(group).values().iterator(); + TserverGroupInfo srcTgi = iter.next(); + + while (srcTgi.getExtras().size() <= expectedExtra) { + if (iter.hasNext()) { + srcTgi = iter.next(); + } else { + continue nextGroup; + } + } + + moves.move(group, 1, srcTgi, destTgi); + + if (srcTgi.getExtras().size() <= expectedExtra) { + emptyServers.add(srcTgi.getTserverInstance()); + } else if (srcTgi.getExtras().get(group) == null) { + emptyServerGroups.add(new Pair<String,TServerInstance>(group, srcTgi.getTserverInstance())); + } + + if (destTgi.getExtras().size() >= expectedExtra) { + break; + } + } + } + + if (emptyServers.size() > 0) { + extraSurplus.columnKeySet().removeAll(emptyServers); + } + + for (Pair<String,TServerInstance> pair : emptyServerGroups) { + extraSurplus.remove(pair.getFirst(), pair.getSecond()); + } + + } + } + } + + private void balanceExpected(Map<TServerInstance,TserverGroupInfo> tservers, Moves moves) { + Multimap<String,TserverGroupInfo> groupDefecits = HashMultimap.create(); + Multimap<String,TserverGroupInfo> groupSurplus = HashMultimap.create(); + + for (TserverGroupInfo tgi : tservers.values()) { + for (String group : tgi.getExpectedDeficits().keySet()) { + groupDefecits.put(group, tgi); + } + + for (String group : tgi.getExtras().keySet()) { + groupSurplus.put(group, tgi); + } + } + + for (String group : groupDefecits.keySet()) { + Collection<TserverGroupInfo> defecitServers = groupDefecits.get(group); + for (TserverGroupInfo defecitTsi : defecitServers) { + int numToMove = defecitTsi.getExpectedDeficits().get(group); + + Iterator<TserverGroupInfo> surplusIter = groupSurplus.get(group).iterator(); + while (numToMove > 0) { + TserverGroupInfo surplusTsi = surplusIter.next(); + + int available = surplusTsi.getExtras().get(group); + + if (numToMove >= available) { + surplusIter.remove(); + } + + int transfer = Math.min(numToMove, available); + + numToMove -= transfer; + + moves.move(group, transfer, surplusTsi, defecitTsi); + } + } + } + } + + private void populateMigrations(SortedMap<TServerInstance,TabletServerStatus> current, List<TabletMigration> migrationsOut, Moves moves) { + Function<KeyExtent,String> partitioner = getPartitioner(); + + for (Pair<KeyExtent,Location> entry : getLocationProvider()) { + String group = partitioner.apply(entry.getFirst()); + Location loc = entry.getSecond(); + + if (loc.equals(Location.NONE) || !current.containsKey(loc.getTserverInstance())) { + migrationsOut.clear(); + return; + } + + TServerInstance dest = moves.removeMove(loc.getTserverInstance(), group); + if (dest != null) { + migrationsOut.add(new TabletMigration(entry.getFirst(), loc.getTserverInstance(), dest)); + } + } + } + + static class LocationFunction implements Function<Iterator<Entry<Key,Value>>,Pair<KeyExtent,Location>> { + @Override + public Pair<KeyExtent,Location> apply(Iterator<Entry<Key,Value>> input) { + Location loc = Location.NONE; + KeyExtent extent = null; + while (input.hasNext()) { + Entry<Key,Value> entry = input.next(); + if (entry.getKey().getColumnFamily().equals(MetadataSchema.TabletsSection.CurrentLocationColumnFamily.NAME)) { + loc = new Location(new TServerInstance(entry.getValue(), entry.getKey().getColumnQualifier())); + } else if (MetadataSchema.TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.hasColumns(entry.getKey())) { + extent = new KeyExtent(entry.getKey().getRow(), entry.getValue()); + } + } + + return new Pair<KeyExtent,Location>(extent, loc); + } + + } + + class MetadataLocationProvider implements Iterable<Pair<KeyExtent,Location>> { + + @Override + public Iterator<Pair<KeyExtent,Location>> iterator() { + try { + Scanner scanner = new IsolatedScanner(context.getConnector().createScanner(MetadataTable.NAME, Authorizations.EMPTY)); + scanner.fetchColumnFamily(MetadataSchema.TabletsSection.CurrentLocationColumnFamily.NAME); + MetadataSchema.TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.fetch(scanner); + scanner.setRange(MetadataSchema.TabletsSection.getRange(tableId)); + + RowIterator rowIter = new RowIterator(scanner); + + return Iterators.transform(rowIter, new LocationFunction()); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/51fbfaf0/server/base/src/main/java/org/apache/accumulo/server/master/balancer/RegexGroupBalancer.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/balancer/RegexGroupBalancer.java b/server/base/src/main/java/org/apache/accumulo/server/master/balancer/RegexGroupBalancer.java new file mode 100644 index 0000000..9ebf178 --- /dev/null +++ b/server/base/src/main/java/org/apache/accumulo/server/master/balancer/RegexGroupBalancer.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.accumulo.server.master.balancer; + +import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.KeyExtent; +import org.apache.hadoop.io.Text; + +import com.google.common.base.Function; + +/** + * A {@link GroupBalancer} that groups tablets using a configurable regex. To use this balancer configure the following settings for your table then configure + * this balancer for your table. + * + * <ul> + * <li/>Set {@code table.custom.balancer.group.regex.pattern} to a regular expression. This regular expression must have one group. The regex is applied to the + * tablet end row and whatever the regex group matches is used as the group. For example with a regex of {@code (\d\d).*} and an end row of {@code 12abc}, the + * group for the tablet would be {@code 12}. + * <li/>Set {@code table.custom.balancer.group.regex.default} to a default group. This group is returned for the last tablet in the table and tablets for which + * the regex does not match. + * <li/>Optionally set {@code table.custom.balancer.group.regex.wait.time} to time (can use time suffixes). This determines how long to wait between balancing. + * Since this balancer scans the metadata table, may want to set this higher for large tables. + * </ul> + */ + +public class RegexGroupBalancer extends GroupBalancer { + + public static final String REGEX_PROPERTY = Property.TABLE_ARBITRARY_PROP_PREFIX.getKey() + "balancer.group.regex.pattern"; + public static final String DEFAUT_GROUP_PROPERTY = Property.TABLE_ARBITRARY_PROP_PREFIX.getKey() + "balancer.group.regex.default"; + public static final String WAIT_TIME_PROPERTY = Property.TABLE_ARBITRARY_PROP_PREFIX.getKey() + "balancer.group.regex.wait.time"; + + private final String tableId; + + public RegexGroupBalancer(String tableId) { + super(tableId); + this.tableId = tableId; + } + + @Override + protected long getWaitTime() { + Map<String,String> customProps = configuration.getTableConfiguration(tableId).getAllPropertiesWithPrefix(Property.TABLE_ARBITRARY_PROP_PREFIX); + if (customProps.containsKey(WAIT_TIME_PROPERTY)) { + return AccumuloConfiguration.getTimeInMillis(customProps.get(WAIT_TIME_PROPERTY)); + } + + return super.getWaitTime(); + } + + @Override + protected Function<KeyExtent,String> getPartitioner() { + + Map<String,String> customProps = configuration.getTableConfiguration(tableId).getAllPropertiesWithPrefix(Property.TABLE_ARBITRARY_PROP_PREFIX); + String regex = customProps.get(REGEX_PROPERTY); + final String defaultGroup = customProps.get(DEFAUT_GROUP_PROPERTY); + + final Pattern pattern = Pattern.compile(regex); + + return new Function<KeyExtent,String>() { + + @Override + public String apply(KeyExtent input) { + Text er = input.getEndRow(); + if (er == null) { + return defaultGroup; + } + + Matcher matcher = pattern.matcher(er.toString()); + if (matcher.matches() && matcher.groupCount() == 1) { + return matcher.group(1); + } + + return defaultGroup; + } + }; + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/51fbfaf0/server/base/src/main/java/org/apache/accumulo/server/master/balancer/TableLoadBalancer.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/balancer/TableLoadBalancer.java b/server/base/src/main/java/org/apache/accumulo/server/master/balancer/TableLoadBalancer.java index 5eae890..ade59e3 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/master/balancer/TableLoadBalancer.java +++ b/server/base/src/main/java/org/apache/accumulo/server/master/balancer/TableLoadBalancer.java @@ -84,6 +84,8 @@ public class TableLoadBalancer extends TabletBalancer { perTableBalancers.put(table, balancer); balancer.init(configuration); } + + log.info("Loaded new class " + clazzName + " for table " + table); } catch (Exception e) { log.warn("Failed to load table balancer class " + clazzName + " for table " + table, e); } http://git-wip-us.apache.org/repos/asf/accumulo/blob/51fbfaf0/server/base/src/test/java/org/apache/accumulo/server/master/balancer/GroupBalancerTest.java ---------------------------------------------------------------------- diff --git a/server/base/src/test/java/org/apache/accumulo/server/master/balancer/GroupBalancerTest.java b/server/base/src/test/java/org/apache/accumulo/server/master/balancer/GroupBalancerTest.java new file mode 100644 index 0000000..6e31001 --- /dev/null +++ b/server/base/src/test/java/org/apache/accumulo/server/master/balancer/GroupBalancerTest.java @@ -0,0 +1,285 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.accumulo.server.master.balancer; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.SortedMap; +import java.util.TreeMap; + +import org.apache.accumulo.core.data.KeyExtent; +import org.apache.accumulo.core.master.thrift.TabletServerStatus; +import org.apache.accumulo.core.util.MapCounter; +import org.apache.accumulo.core.util.Pair; +import org.apache.accumulo.server.master.state.TServerInstance; +import org.apache.accumulo.server.master.state.TabletMigration; +import org.apache.hadoop.io.Text; +import org.junit.Assert; +import org.junit.Test; + +import com.google.common.base.Function; +import com.google.common.collect.Iterables; + +public class GroupBalancerTest { + + private static Function<KeyExtent,String> partitioner = new Function<KeyExtent,String>() { + + @Override + public String apply(KeyExtent input) { + return input.getEndRow().toString().substring(0, 2); + } + }; + + public static class TabletServers { + private final Set<TServerInstance> tservers = new HashSet<>(); + private final Map<KeyExtent,TServerInstance> tabletLocs = new HashMap<>(); + + public void addTservers(String... locs) { + for (String loc : locs) { + addTserver(loc); + } + } + + public void addTserver(String loc) { + tservers.add(new TServerInstance(loc, 6)); + } + + public void addTablet(String er, String location) { + TServerInstance tsi = new TServerInstance(location, 6); + tabletLocs.put(new KeyExtent(new Text("b"), er == null ? null : new Text(er), null), new TServerInstance(location, 6)); + tservers.add(tsi); + } + + public void balance() { + GroupBalancer balancer = new GroupBalancer("1") { + + @Override + protected Iterable<Pair<KeyExtent,Location>> getLocationProvider() { + return Iterables.transform(tabletLocs.entrySet(), new Function<Map.Entry<KeyExtent,TServerInstance>,Pair<KeyExtent,Location>>() { + + @Override + public Pair<KeyExtent,Location> apply(final Entry<KeyExtent,TServerInstance> input) { + return new Pair<KeyExtent,Location>(input.getKey(), new Location(input.getValue())); + } + }); + + } + + @Override + protected Function<KeyExtent,String> getPartitioner() { + return partitioner; + } + + @Override + protected long getWaitTime() { + return 0; + } + }; + + balance(balancer); + } + + public void balance(TabletBalancer balancer) { + + while (true) { + Set<KeyExtent> migrations = new HashSet<>(); + List<TabletMigration> migrationsOut = new ArrayList<>(); + SortedMap<TServerInstance,TabletServerStatus> current = new TreeMap<>(); + + for (TServerInstance tsi : tservers) { + current.put(tsi, new TabletServerStatus()); + } + + balancer.balance(current, migrations, migrationsOut); + + for (TabletMigration tabletMigration : migrationsOut) { + Assert.assertEquals(tabletLocs.get(tabletMigration.tablet), tabletMigration.oldServer); + Assert.assertTrue(tservers.contains(tabletMigration.newServer)); + + tabletLocs.put(tabletMigration.tablet, tabletMigration.newServer); + } + + if (migrationsOut.size() == 0) { + break; + } + } + + checkBalance(); + } + + void checkBalance() { + MapCounter<String> groupCounts = new MapCounter<>(); + Map<TServerInstance,MapCounter<String>> tserverGroupCounts = new HashMap<>(); + + for (Entry<KeyExtent,TServerInstance> entry : tabletLocs.entrySet()) { + String group = partitioner.apply(entry.getKey()); + TServerInstance loc = entry.getValue(); + + groupCounts.increment(group, 1); + MapCounter<String> tgc = tserverGroupCounts.get(loc); + if (tgc == null) { + tgc = new MapCounter<>(); + tserverGroupCounts.put(loc, tgc); + } + + tgc.increment(group, 1); + } + + Map<String,Integer> expectedCounts = new HashMap<>(); + + int totalExtra = 0; + for (String group : groupCounts.keySet()) { + long groupCount = groupCounts.get(group); + totalExtra += groupCount % tservers.size(); + expectedCounts.put(group, (int) (groupCount / tservers.size())); + } + + // The number of extra tablets from all groups that each tserver must have. + int expectedExtra = totalExtra / tservers.size(); + int maxExtraGroups = expectedExtra + ((totalExtra % tservers.size() > 0) ? 1 : 0); + + for (Entry<TServerInstance,MapCounter<String>> entry : tserverGroupCounts.entrySet()) { + MapCounter<String> tgc = entry.getValue(); + int tserverExtra = 0; + for (String group : groupCounts.keySet()) { + Assert.assertTrue(tgc.get(group) >= expectedCounts.get(group)); + Assert.assertTrue(tgc.get(group) <= expectedCounts.get(group) + 1); + tserverExtra += tgc.get(group) - expectedCounts.get(group); + } + + Assert.assertTrue(tserverExtra >= expectedExtra); + Assert.assertTrue(tserverExtra <= maxExtraGroups); + } + } + + Map<KeyExtent,TServerInstance> getLocations() { + return tabletLocs; + } + } + + @Test + public void testSingleGroup() { + + String tests[][] = new String[][] {new String[] {"a", "b", "c", "d"}, new String[] {"a", "b", "c"}, new String[] {"a", "b", "c", "d", "e"}, + new String[] {"a", "b", "c", "d", "e", "f", "g"}, new String[] {"a", "b", "c", "d", "e", "f", "g", "h"}, + new String[] {"a", "b", "c", "d", "e", "f", "g", "h", "i"}, new String[] {"a"}}; + + for (String[] suffixes : tests) { + for (int maxTS = 1; maxTS <= 4; maxTS++) { + TabletServers tservers = new TabletServers(); + tservers = new TabletServers(); + int ts = 0; + for (String s : suffixes) { + tservers.addTablet("01" + s, "192.168.1." + ((ts++ % maxTS) + 1) + ":9997"); + } + + tservers.addTservers("192.168.1.2:9997", "192.168.1.3:9997", "192.168.1.4:9997"); + tservers.balance(); + tservers.balance(); + } + } + } + + @Test + public void testTwoGroups() { + String tests[][] = new String[][] {new String[] {"a", "b", "c", "d"}, new String[] {"a", "b", "c"}, new String[] {"a", "b", "c", "d", "e"}, + new String[] {"a", "b", "c", "d", "e", "f", "g"}, new String[] {"a", "b", "c", "d", "e", "f", "g", "h"}, + new String[] {"a", "b", "c", "d", "e", "f", "g", "h", "i"}, new String[] {"a"}}; + + for (String[] suffixes1 : tests) { + for (String[] suffixes2 : tests) { + for (int maxTS = 1; maxTS <= 4; maxTS++) { + TabletServers tservers = new TabletServers(); + tservers = new TabletServers(); + int ts = 0; + for (String s : suffixes1) { + tservers.addTablet("01" + s, "192.168.1." + ((ts++ % maxTS) + 1) + ":9997"); + } + + for (String s : suffixes2) { + tservers.addTablet("02" + s, "192.168.1." + ((ts++ % maxTS) + 1) + ":9997"); + } + + tservers.addTservers("192.168.1.2:9997", "192.168.1.3:9997", "192.168.1.4:9997"); + tservers.balance(); + tservers.balance(); + } + } + } + } + + @Test + public void testThreeGroups() { + String tests[][] = new String[][] {new String[] {"a", "b", "c", "d"}, new String[] {"a", "b", "c"}, new String[] {"a", "b", "c", "d", "e"}, + new String[] {"a", "b", "c", "d", "e", "f", "g"}, new String[] {"a", "b", "c", "d", "e", "f", "g", "h"}, + new String[] {"a", "b", "c", "d", "e", "f", "g", "h", "i"}, new String[] {"a"}}; + + for (String[] suffixes1 : tests) { + for (String[] suffixes2 : tests) { + for (String[] suffixes3 : tests) { + for (int maxTS = 1; maxTS <= 4; maxTS++) { + TabletServers tservers = new TabletServers(); + tservers = new TabletServers(); + int ts = 0; + for (String s : suffixes1) { + tservers.addTablet("01" + s, "192.168.1." + ((ts++ % maxTS) + 1) + ":9997"); + } + + for (String s : suffixes2) { + tservers.addTablet("02" + s, "192.168.1." + ((ts++ % maxTS) + 1) + ":9997"); + } + + for (String s : suffixes3) { + tservers.addTablet("03" + s, "192.168.1." + ((ts++ % maxTS) + 1) + ":9997"); + } + + tservers.addTservers("192.168.1.2:9997", "192.168.1.3:9997", "192.168.1.4:9997"); + tservers.balance(); + tservers.balance(); + } + } + } + } + } + + @Test + public void testManySingleTabletGroups() { + + for (int numGroups = 1; numGroups <= 13; numGroups++) { + for (int maxTS = 1; maxTS <= 4; maxTS++) { + TabletServers tservers = new TabletServers(); + tservers = new TabletServers(); + int ts = 0; + + for (int group = 1; group <= numGroups; group++) { + tservers.addTablet(String.format("%02d:p", group), "192.168.1." + ((ts++ % maxTS) + 1) + ":9997"); + } + + tservers.addTservers("192.168.1.2:9997", "192.168.1.3:9997", "192.168.1.4:9997"); + + tservers.balance(); + tservers.balance(); + } + } + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/51fbfaf0/test/src/test/java/org/apache/accumulo/test/functional/RegexGroupBalanceIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/accumulo/test/functional/RegexGroupBalanceIT.java b/test/src/test/java/org/apache/accumulo/test/functional/RegexGroupBalanceIT.java new file mode 100644 index 0000000..e32d9b1 --- /dev/null +++ b/test/src/test/java/org/apache/accumulo/test/functional/RegexGroupBalanceIT.java @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.accumulo.test.functional; + +import java.util.Collection; +import java.util.Collections; +import java.util.Map; +import java.util.Map.Entry; +import java.util.SortedSet; +import java.util.TreeSet; + +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.metadata.MetadataTable; +import org.apache.accumulo.core.metadata.schema.MetadataSchema; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl; +import org.apache.accumulo.server.master.balancer.RegexGroupBalancer; +import org.apache.accumulo.server.master.state.TServerInstance; +import org.apache.commons.lang3.mutable.MutableInt; +import org.apache.hadoop.io.Text; +import org.junit.Test; + +import com.google.common.collect.HashBasedTable; +import com.google.common.collect.Table; + +public class RegexGroupBalanceIT extends ConfigurableMacIT { + + @Override + public void beforeClusterStart(MiniAccumuloConfigImpl cfg) throws Exception { + cfg.setNumTservers(4); + } + + @Test(timeout = 120000) + public void testBalancing() throws Exception { + Connector conn = getConnector(); + String tablename = getUniqueNames(1)[0]; + conn.tableOperations().create(tablename); + + SortedSet<Text> splits = new TreeSet<>(); + splits.add(new Text("01a")); + splits.add(new Text("01m")); + splits.add(new Text("01z")); + + splits.add(new Text("02a")); + splits.add(new Text("02f")); + splits.add(new Text("02r")); + splits.add(new Text("02z")); + + splits.add(new Text("03a")); + splits.add(new Text("03f")); + splits.add(new Text("03m")); + splits.add(new Text("03r")); + + conn.tableOperations().setProperty(tablename, RegexGroupBalancer.REGEX_PROPERTY, "(\\d\\d).*"); + conn.tableOperations().setProperty(tablename, RegexGroupBalancer.DEFAUT_GROUP_PROPERTY, "03"); + conn.tableOperations().setProperty(tablename, RegexGroupBalancer.WAIT_TIME_PROPERTY, "50ms"); + conn.tableOperations().setProperty(tablename, Property.TABLE_LOAD_BALANCER.getKey(), RegexGroupBalancer.class.getName()); + + conn.tableOperations().addSplits(tablename, splits); + + while (true) { + Thread.sleep(250); + + Table<String,String,MutableInt> groupLocationCounts = getCounts(conn, tablename); + + boolean allGood = true; + allGood &= checkGroup(groupLocationCounts, "01", 1, 1, 3); + allGood &= checkGroup(groupLocationCounts, "02", 1, 1, 4); + allGood &= checkGroup(groupLocationCounts, "03", 1, 2, 4); + allGood &= checkTabletsPerTserver(groupLocationCounts, 3, 3, 4); + + if (allGood) { + break; + } + } + + splits.clear(); + splits.add(new Text("01b")); + splits.add(new Text("01f")); + splits.add(new Text("01l")); + splits.add(new Text("01r")); + conn.tableOperations().addSplits(tablename, splits); + + while (true) { + Thread.sleep(250); + + Table<String,String,MutableInt> groupLocationCounts = getCounts(conn, tablename); + + boolean allGood = true; + allGood &= checkGroup(groupLocationCounts, "01", 1, 2, 4); + allGood &= checkGroup(groupLocationCounts, "02", 1, 1, 4); + allGood &= checkGroup(groupLocationCounts, "03", 1, 2, 4); + allGood &= checkTabletsPerTserver(groupLocationCounts, 4, 4, 4); + + if (allGood) { + break; + } + } + + // merge group 01 down to one tablet + conn.tableOperations().merge(tablename, null, new Text("01z")); + + while (true) { + Thread.sleep(250); + + Table<String,String,MutableInt> groupLocationCounts = getCounts(conn, tablename); + + boolean allGood = true; + allGood &= checkGroup(groupLocationCounts, "01", 1, 1, 1); + allGood &= checkGroup(groupLocationCounts, "02", 1, 1, 4); + allGood &= checkGroup(groupLocationCounts, "03", 1, 2, 4); + allGood &= checkTabletsPerTserver(groupLocationCounts, 2, 3, 4); + + if (allGood) { + break; + } + } + } + + private boolean checkTabletsPerTserver(Table<String,String,MutableInt> groupLocationCounts, int minTabletPerTserver, int maxTabletsPerTserver, + int totalTservser) { + // check that each tserver has between min and max tablets + for (Map<String,MutableInt> groups : groupLocationCounts.columnMap().values()) { + int sum = 0; + for (MutableInt mi : groups.values()) { + sum += mi.intValue(); + } + + if (sum < minTabletPerTserver || sum > maxTabletsPerTserver) { + return false; + } + } + + return groupLocationCounts.columnKeySet().size() == totalTservser; + } + + private boolean checkGroup(Table<String,String,MutableInt> groupLocationCounts, String group, int min, int max, int tsevers) { + Collection<MutableInt> counts = groupLocationCounts.row(group).values(); + if (counts.size() == 0) { + return min == 0 && max == 0 && tsevers == 0; + } + return min == Collections.min(counts).intValue() && max == Collections.max(counts).intValue() && counts.size() == tsevers; + } + + private Table<String,String,MutableInt> getCounts(Connector conn, String tablename) throws TableNotFoundException { + Scanner s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY); + s.fetchColumnFamily(MetadataSchema.TabletsSection.CurrentLocationColumnFamily.NAME); + String tableId = conn.tableOperations().tableIdMap().get(tablename); + s.setRange(MetadataSchema.TabletsSection.getRange(tableId)); + + Table<String,String,MutableInt> groupLocationCounts = HashBasedTable.create(); + + for (Entry<Key,Value> entry : s) { + String group = entry.getKey().getRow().toString(); + if (group.endsWith("<")) { + group = "03"; + } else { + group = group.substring(tableId.length() + 1).substring(0, 2); + } + String loc = new TServerInstance(entry.getValue(), entry.getKey().getColumnQualifier()).toString(); + + MutableInt count = groupLocationCounts.get(group, loc); + if (count == null) { + count = new MutableInt(0); + groupLocationCounts.put(group, loc, count); + } + + count.increment(); + } + return groupLocationCounts; + } +}