Repository: accumulo
Updated Branches:
  refs/heads/master d97a6a2b1 -> 8f4afc5a3


ACCUMULO-3601 cherry picked migration fix to 1.5


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/a1fe8177
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/a1fe8177
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/a1fe8177

Branch: refs/heads/master
Commit: a1fe8177dc0adf2696c5e4f19e48150eb94de600
Parents: bd981c8
Author: Eric Newton <Eric Newton>
Authored: Thu Feb 19 15:38:48 2015 -0500
Committer: Eric Newton <Eric Newton>
Committed: Thu Feb 19 16:50:36 2015 -0500

----------------------------------------------------------------------
 .../apache/accumulo/server/master/Master.java   |  5 +++
 .../server/master/state/CurrentState.java       |  3 ++
 .../master/state/MetaDataTableScanner.java      |  2 +
 .../master/state/TabletStateChangeIterator.java | 40 ++++++++++++++++++++
 .../accumulo/server/master/TestMergeState.java  |  5 +++
 5 files changed, 55 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/a1fe8177/server/src/main/java/org/apache/accumulo/server/master/Master.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/master/Master.java 
b/server/src/main/java/org/apache/accumulo/server/master/Master.java
index cbb80ba..abf7468 100644
--- a/server/src/main/java/org/apache/accumulo/server/master/Master.java
+++ b/server/src/main/java/org/apache/accumulo/server/master/Master.java
@@ -2629,4 +2629,9 @@ public class Master implements LiveTServerSet.Listener, 
TableObserver, CurrentSt
   public void updateRecoveryInProgress(String file) {
     recoveriesInProgress.add(file);
   }
+
+  @Override
+  public Collection<KeyExtent> migrations() {
+    return migrations.keySet();
+  }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/a1fe8177/server/src/main/java/org/apache/accumulo/server/master/state/CurrentState.java
----------------------------------------------------------------------
diff --git 
a/server/src/main/java/org/apache/accumulo/server/master/state/CurrentState.java
 
b/server/src/main/java/org/apache/accumulo/server/master/state/CurrentState.java
index 501d66a..b07a931 100644
--- 
a/server/src/main/java/org/apache/accumulo/server/master/state/CurrentState.java
+++ 
b/server/src/main/java/org/apache/accumulo/server/master/state/CurrentState.java
@@ -19,6 +19,8 @@ package org.apache.accumulo.server.master.state;
 import java.util.Collection;
 import java.util.Set;
 
+import org.apache.accumulo.core.data.KeyExtent;
+
 public interface CurrentState {
 
   Set<String> onlineTables();
@@ -27,4 +29,5 @@ public interface CurrentState {
 
   Collection<MergeInfo> merges();
 
+  Collection<KeyExtent> migrations();
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/a1fe8177/server/src/main/java/org/apache/accumulo/server/master/state/MetaDataTableScanner.java
----------------------------------------------------------------------
diff --git 
a/server/src/main/java/org/apache/accumulo/server/master/state/MetaDataTableScanner.java
 
b/server/src/main/java/org/apache/accumulo/server/master/state/MetaDataTableScanner.java
index 50609ec..65220dc 100644
--- 
a/server/src/main/java/org/apache/accumulo/server/master/state/MetaDataTableScanner.java
+++ 
b/server/src/main/java/org/apache/accumulo/server/master/state/MetaDataTableScanner.java
@@ -76,6 +76,7 @@ public class MetaDataTableScanner implements 
Iterator<TabletLocationState> {
       TabletStateChangeIterator.setCurrentServers(tabletChange, 
state.onlineTabletServers());
       TabletStateChangeIterator.setOnlineTables(tabletChange, 
state.onlineTables());
       TabletStateChangeIterator.setMerges(tabletChange, state.merges());
+      TabletStateChangeIterator.setMigrations(tabletChange, 
state.migrations());
     }
     scanner.addScanIterator(tabletChange);
   }
@@ -91,6 +92,7 @@ public class MetaDataTableScanner implements 
Iterator<TabletLocationState> {
     }
   }
 
+  @Override
   protected void finalize() {
     close();
   }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/a1fe8177/server/src/main/java/org/apache/accumulo/server/master/state/TabletStateChangeIterator.java
----------------------------------------------------------------------
diff --git 
a/server/src/main/java/org/apache/accumulo/server/master/state/TabletStateChangeIterator.java
 
b/server/src/main/java/org/apache/accumulo/server/master/state/TabletStateChangeIterator.java
index a3402df..7db9da1 100644
--- 
a/server/src/main/java/org/apache/accumulo/server/master/state/TabletStateChangeIterator.java
+++ 
b/server/src/main/java/org/apache/accumulo/server/master/state/TabletStateChangeIterator.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -49,11 +50,13 @@ public class TabletStateChangeIterator extends 
SkippingIterator {
   private static final String SERVERS_OPTION = "servers";
   private static final String TABLES_OPTION = "tables";
   private static final String MERGES_OPTION = "merges";
+  private static final String MIGRATIONS_OPTION = "migrations";
   // private static final Logger log = 
Logger.getLogger(TabletStateChangeIterator.class);
 
   Set<TServerInstance> current;
   Set<String> onlineTables;
   Map<Text,MergeInfo> merges;
+  Set<KeyExtent> migrations;
 
   @Override
   public void init(SortedKeyValueIterator<Key,Value> source, 
Map<String,String> options, IteratorEnvironment env) throws IOException {
@@ -61,6 +64,26 @@ public class TabletStateChangeIterator extends 
SkippingIterator {
     current = parseServers(options.get(SERVERS_OPTION));
     onlineTables = parseTables(options.get(TABLES_OPTION));
     merges = parseMerges(options.get(MERGES_OPTION));
+    migrations = parseMigrations(options.get(MIGRATIONS_OPTION));
+  }
+
+  private Set<KeyExtent> parseMigrations(String migrations) {
+    if (migrations == null)
+      return Collections.emptySet();
+    try {
+      Set<KeyExtent> result = new HashSet<KeyExtent>();
+      DataInputBuffer buffer = new DataInputBuffer();
+      byte[] data = Base64.decodeBase64(migrations.getBytes(UTF_8));
+      buffer.reset(data, data.length);
+      while (buffer.available() > 0) {
+        KeyExtent extent = new KeyExtent();
+        extent.readFields(buffer);
+        result.add(extent);
+      }
+      return result;
+    } catch (Exception ex) {
+      throw new RuntimeException(ex);
+    }
   }
 
   private Set<String> parseTables(String tables) {
@@ -133,6 +156,10 @@ public class TabletStateChangeIterator extends 
SkippingIterator {
         // could make this smarter by only returning if the tablet is involved 
in the merge
         return;
       }
+      // always return the information for migrating tablets
+      if (migrations.contains(tls.extent)) {
+        return;
+      }
 
       // is the table supposed to be online or offline?
       boolean shouldBeOnline = 
onlineTables.contains(tls.extent.getTableId().toString());
@@ -194,4 +221,17 @@ public class TabletStateChangeIterator extends 
SkippingIterator {
     cfg.addOption(MERGES_OPTION, encoded);
   }
 
+  public static void setMigrations(IteratorSetting cfg, Collection<KeyExtent> 
migrations) {
+    DataOutputBuffer buffer = new DataOutputBuffer();
+    try {
+      for (KeyExtent  extent : migrations) {
+        extent.write(buffer);
+      }
+    } catch (Exception ex) {
+      throw new RuntimeException(ex);
+    }
+    String encoded = Base64.encodeBase64String(Arrays.copyOf(buffer.getData(), 
buffer.getLength()));
+    cfg.addOption(MIGRATIONS_OPTION, encoded);
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/a1fe8177/server/src/test/java/org/apache/accumulo/server/master/TestMergeState.java
----------------------------------------------------------------------
diff --git 
a/server/src/test/java/org/apache/accumulo/server/master/TestMergeState.java 
b/server/src/test/java/org/apache/accumulo/server/master/TestMergeState.java
index bb6294b..bf58084 100644
--- a/server/src/test/java/org/apache/accumulo/server/master/TestMergeState.java
+++ b/server/src/test/java/org/apache/accumulo/server/master/TestMergeState.java
@@ -77,6 +77,11 @@ public class TestMergeState {
     public Collection<MergeInfo> merges() {
       return Collections.singleton(mergeInfo);
     }
+
+    @Override
+    public Collection<KeyExtent> migrations() {
+      return Collections.emptyList();
+    }
   }
 
   private static void update(Connector c, Mutation m) throws 
TableNotFoundException, MutationsRejectedException {

Reply via email to