ACCUMULO-378 Can't use '_' as the row separator for Order records.

The ULongLexicoder *might* create bytes that actually equal the '_', which
will mess up the splitting logic of the row key. Switch it to a \x00 instead
and find the last instance of it.


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/49fc9855
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/49fc9855
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/49fc9855

Branch: refs/heads/ACCUMULO-378
Commit: 49fc9855f996ae0f5b3cc20e03e77ea8f707d640
Parents: 0ff0e02
Author: Josh Elser <els...@apache.org>
Authored: Wed May 28 20:07:52 2014 -0400
Committer: Josh Elser <els...@apache.org>
Committed: Wed May 28 20:58:43 2014 -0400

----------------------------------------------------------------------
 .../core/replication/ReplicationSchema.java     | 30 ++++++++++++++++----
 .../core/replication/ReplicationSchemaTest.java | 12 ++++++++
 .../accumulo/master/replication/WorkDriver.java |  6 +++-
 3 files changed, 41 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/49fc9855/core/src/main/java/org/apache/accumulo/core/replication/ReplicationSchema.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/accumulo/core/replication/ReplicationSchema.java
 
b/core/src/main/java/org/apache/accumulo/core/replication/ReplicationSchema.java
index 8699bd2..ab350e6 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/replication/ReplicationSchema.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/replication/ReplicationSchema.java
@@ -153,11 +153,11 @@ public class ReplicationSchema {
    * Holds the order in which files needed for replication were closed. The 
intent is to be able to guarantee that files which were closed earlier were
    * replicated first and we don't replay data in the wrong order on our peers
    * <p>
-   * 
<code>encodedTimeOfClosure_hdfs://localhost:8020/accumulo/wal/tserver+port/WAL 
order:source_table_id [] -> Status Protobuf</code>
+   * 
<code>encodedTimeOfClosure\x00hdfs://localhost:8020/accumulo/wal/tserver+port/WAL
 order:source_table_id [] -> Status Protobuf</code>
    */
   public static class OrderSection {
     public static final Text NAME = new Text("order");
-    public static final String ROW_SEPARATOR = "_";
+    public static final Text ROW_SEPARATOR = new Text(new byte[]{0});
     private static final ULongLexicoder longEncoder = new ULongLexicoder();
 
     /**
@@ -218,10 +218,10 @@ public class ReplicationSchema {
       Path p = new Path(file);
       String pathString = p.toUri().toString();
 
-      log.info("Normalized {} into {}", file, pathString);
+      log.trace("Normalized {} into {}", file, pathString);
 
       // Append the file as a suffix to the row
-      row.append((ROW_SEPARATOR + pathString).getBytes(), 0, 
pathString.length() + ROW_SEPARATOR.length());
+      row.append((ROW_SEPARATOR + pathString).getBytes(), 0, 
pathString.length() + ROW_SEPARATOR.getLength());
 
       // Make the mutation and add the column update
       return new Mutation(row);
@@ -249,7 +249,16 @@ public class ReplicationSchema {
 
     public static long getTimeClosed(Key k, Text buff) {
       k.getRow(buff);
-      int offset = buff.find(ROW_SEPARATOR);
+      int offset = 0;
+      // find the last offset
+      while (true) {
+        int nextOffset = buff.find(ROW_SEPARATOR.toString(), offset + 1);
+        if (-1 == nextOffset) {
+          break;
+        }
+        offset = nextOffset;
+      }
+
       if (-1 == offset) {
         throw new IllegalArgumentException("Row does not contain expected 
separator for OrderSection");
       }
@@ -266,7 +275,16 @@ public class ReplicationSchema {
 
     public static String getFile(Key k, Text buff) {
       k.getRow(buff);
-      int offset = buff.find(ROW_SEPARATOR);
+      int offset = 0;
+      // find the last offset
+      while (true) {
+        int nextOffset = buff.find(ROW_SEPARATOR.toString(), offset + 1);
+        if (-1 == nextOffset) {
+          break;
+        }
+        offset = nextOffset;
+      }
+
       if (-1 == offset) {
         throw new IllegalArgumentException("Row does not contain expected 
separator for OrderSection");
       }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/49fc9855/core/src/test/java/org/apache/accumulo/core/replication/ReplicationSchemaTest.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/accumulo/core/replication/ReplicationSchemaTest.java
 
b/core/src/test/java/org/apache/accumulo/core/replication/ReplicationSchemaTest.java
index d321153..3822641 100644
--- 
a/core/src/test/java/org/apache/accumulo/core/replication/ReplicationSchemaTest.java
+++ 
b/core/src/test/java/org/apache/accumulo/core/replication/ReplicationSchemaTest.java
@@ -119,4 +119,16 @@ public class ReplicationSchemaTest {
     Assert.assertEquals("/accumulo/file", OrderSection.getFile(k, buff));
     Assert.assertEquals(now, OrderSection.getTimeClosed(k, buff));
   }
+
+  @Test
+  public void separatorDoesntInterferWithSplit() {
+    Text buff = new Text();
+    // Cycle through 2*128 values
+    for (long i = 1; i < 258; i++) {
+      Mutation m = OrderSection.createMutation("/accumulo/file", i);
+      Key k = new Key(new Text(m.getRow()));
+      Assert.assertEquals("/accumulo/file", OrderSection.getFile(k, buff));
+      Assert.assertEquals(i, OrderSection.getTimeClosed(k, buff));
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/49fc9855/server/master/src/main/java/org/apache/accumulo/master/replication/WorkDriver.java
----------------------------------------------------------------------
diff --git 
a/server/master/src/main/java/org/apache/accumulo/master/replication/WorkDriver.java
 
b/server/master/src/main/java/org/apache/accumulo/master/replication/WorkDriver.java
index 8c3e3e3..43a74a8 100644
--- 
a/server/master/src/main/java/org/apache/accumulo/master/replication/WorkDriver.java
+++ 
b/server/master/src/main/java/org/apache/accumulo/master/replication/WorkDriver.java
@@ -93,7 +93,11 @@ public class WorkDriver extends Daemon {
 
     while (master.stillMaster()) {
       // Assign the work using the configured implementation
-      assigner.assignWork();
+      try {
+        assigner.assignWork();
+      } catch (Exception e) {
+        log.error("Error while assigning work", e);
+      }
 
       long sleepTime = 
conf.getTimeInMillis(Property.REPLICATION_WORK_ASSIGNMENT_SLEEP);
       log.debug("Sleeping {} ms before next work assignment", sleepTime);

Reply via email to