Repository: accumulo Updated Branches: refs/heads/1.5 bd981c886 -> a1fe8177d
ACCUMULO-3601 cherry picked migration fix to 1.5 Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/a1fe8177 Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/a1fe8177 Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/a1fe8177 Branch: refs/heads/1.5 Commit: a1fe8177dc0adf2696c5e4f19e48150eb94de600 Parents: bd981c8 Author: Eric Newton <Eric Newton> Authored: Thu Feb 19 15:38:48 2015 -0500 Committer: Eric Newton <Eric Newton> Committed: Thu Feb 19 16:50:36 2015 -0500 ---------------------------------------------------------------------- .../apache/accumulo/server/master/Master.java | 5 +++ .../server/master/state/CurrentState.java | 3 ++ .../master/state/MetaDataTableScanner.java | 2 + .../master/state/TabletStateChangeIterator.java | 40 ++++++++++++++++++++ .../accumulo/server/master/TestMergeState.java | 5 +++ 5 files changed, 55 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/a1fe8177/server/src/main/java/org/apache/accumulo/server/master/Master.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/org/apache/accumulo/server/master/Master.java b/server/src/main/java/org/apache/accumulo/server/master/Master.java index cbb80ba..abf7468 100644 --- a/server/src/main/java/org/apache/accumulo/server/master/Master.java +++ b/server/src/main/java/org/apache/accumulo/server/master/Master.java @@ -2629,4 +2629,9 @@ public class Master implements LiveTServerSet.Listener, TableObserver, CurrentSt public void updateRecoveryInProgress(String file) { recoveriesInProgress.add(file); } + + @Override + public Collection<KeyExtent> migrations() { + return migrations.keySet(); + } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/a1fe8177/server/src/main/java/org/apache/accumulo/server/master/state/CurrentState.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/org/apache/accumulo/server/master/state/CurrentState.java b/server/src/main/java/org/apache/accumulo/server/master/state/CurrentState.java index 501d66a..b07a931 100644 --- a/server/src/main/java/org/apache/accumulo/server/master/state/CurrentState.java +++ b/server/src/main/java/org/apache/accumulo/server/master/state/CurrentState.java @@ -19,6 +19,8 @@ package org.apache.accumulo.server.master.state; import java.util.Collection; import java.util.Set; +import org.apache.accumulo.core.data.KeyExtent; + public interface CurrentState { Set<String> onlineTables(); @@ -27,4 +29,5 @@ public interface CurrentState { Collection<MergeInfo> merges(); + Collection<KeyExtent> migrations(); } http://git-wip-us.apache.org/repos/asf/accumulo/blob/a1fe8177/server/src/main/java/org/apache/accumulo/server/master/state/MetaDataTableScanner.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/org/apache/accumulo/server/master/state/MetaDataTableScanner.java b/server/src/main/java/org/apache/accumulo/server/master/state/MetaDataTableScanner.java index 50609ec..65220dc 100644 --- a/server/src/main/java/org/apache/accumulo/server/master/state/MetaDataTableScanner.java +++ b/server/src/main/java/org/apache/accumulo/server/master/state/MetaDataTableScanner.java @@ -76,6 +76,7 @@ public class MetaDataTableScanner implements Iterator<TabletLocationState> { TabletStateChangeIterator.setCurrentServers(tabletChange, state.onlineTabletServers()); TabletStateChangeIterator.setOnlineTables(tabletChange, state.onlineTables()); TabletStateChangeIterator.setMerges(tabletChange, state.merges()); + TabletStateChangeIterator.setMigrations(tabletChange, state.migrations()); } scanner.addScanIterator(tabletChange); } @@ -91,6 +92,7 @@ public class MetaDataTableScanner implements Iterator<TabletLocationState> { } } + @Override protected void finalize() { close(); } http://git-wip-us.apache.org/repos/asf/accumulo/blob/a1fe8177/server/src/main/java/org/apache/accumulo/server/master/state/TabletStateChangeIterator.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/org/apache/accumulo/server/master/state/TabletStateChangeIterator.java b/server/src/main/java/org/apache/accumulo/server/master/state/TabletStateChangeIterator.java index a3402df..7db9da1 100644 --- a/server/src/main/java/org/apache/accumulo/server/master/state/TabletStateChangeIterator.java +++ b/server/src/main/java/org/apache/accumulo/server/master/state/TabletStateChangeIterator.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -49,11 +50,13 @@ public class TabletStateChangeIterator extends SkippingIterator { private static final String SERVERS_OPTION = "servers"; private static final String TABLES_OPTION = "tables"; private static final String MERGES_OPTION = "merges"; + private static final String MIGRATIONS_OPTION = "migrations"; // private static final Logger log = Logger.getLogger(TabletStateChangeIterator.class); Set<TServerInstance> current; Set<String> onlineTables; Map<Text,MergeInfo> merges; + Set<KeyExtent> migrations; @Override public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException { @@ -61,6 +64,26 @@ public class TabletStateChangeIterator extends SkippingIterator { current = parseServers(options.get(SERVERS_OPTION)); onlineTables = parseTables(options.get(TABLES_OPTION)); merges = parseMerges(options.get(MERGES_OPTION)); + migrations = parseMigrations(options.get(MIGRATIONS_OPTION)); + } + + private Set<KeyExtent> parseMigrations(String migrations) { + if (migrations == null) + return Collections.emptySet(); + try { + Set<KeyExtent> result = new HashSet<KeyExtent>(); + DataInputBuffer buffer = new DataInputBuffer(); + byte[] data = Base64.decodeBase64(migrations.getBytes(UTF_8)); + buffer.reset(data, data.length); + while (buffer.available() > 0) { + KeyExtent extent = new KeyExtent(); + extent.readFields(buffer); + result.add(extent); + } + return result; + } catch (Exception ex) { + throw new RuntimeException(ex); + } } private Set<String> parseTables(String tables) { @@ -133,6 +156,10 @@ public class TabletStateChangeIterator extends SkippingIterator { // could make this smarter by only returning if the tablet is involved in the merge return; } + // always return the information for migrating tablets + if (migrations.contains(tls.extent)) { + return; + } // is the table supposed to be online or offline? boolean shouldBeOnline = onlineTables.contains(tls.extent.getTableId().toString()); @@ -194,4 +221,17 @@ public class TabletStateChangeIterator extends SkippingIterator { cfg.addOption(MERGES_OPTION, encoded); } + public static void setMigrations(IteratorSetting cfg, Collection<KeyExtent> migrations) { + DataOutputBuffer buffer = new DataOutputBuffer(); + try { + for (KeyExtent extent : migrations) { + extent.write(buffer); + } + } catch (Exception ex) { + throw new RuntimeException(ex); + } + String encoded = Base64.encodeBase64String(Arrays.copyOf(buffer.getData(), buffer.getLength())); + cfg.addOption(MIGRATIONS_OPTION, encoded); + } + } http://git-wip-us.apache.org/repos/asf/accumulo/blob/a1fe8177/server/src/test/java/org/apache/accumulo/server/master/TestMergeState.java ---------------------------------------------------------------------- diff --git a/server/src/test/java/org/apache/accumulo/server/master/TestMergeState.java b/server/src/test/java/org/apache/accumulo/server/master/TestMergeState.java index bb6294b..bf58084 100644 --- a/server/src/test/java/org/apache/accumulo/server/master/TestMergeState.java +++ b/server/src/test/java/org/apache/accumulo/server/master/TestMergeState.java @@ -77,6 +77,11 @@ public class TestMergeState { public Collection<MergeInfo> merges() { return Collections.singleton(mergeInfo); } + + @Override + public Collection<KeyExtent> migrations() { + return Collections.emptyList(); + } } private static void update(Connector c, Mutation m) throws TableNotFoundException, MutationsRejectedException {