ACCUMULO-378 Can't use '_' as the row separator for Order records. The ULongLexicoder *might* create bytes that actually equal the '_', which will mess up the splitting logic of the row key. Switch it to a \x00 instead and find the last instance of it.
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/49fc9855 Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/49fc9855 Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/49fc9855 Branch: refs/heads/ACCUMULO-378 Commit: 49fc9855f996ae0f5b3cc20e03e77ea8f707d640 Parents: 0ff0e02 Author: Josh Elser <els...@apache.org> Authored: Wed May 28 20:07:52 2014 -0400 Committer: Josh Elser <els...@apache.org> Committed: Wed May 28 20:58:43 2014 -0400 ---------------------------------------------------------------------- .../core/replication/ReplicationSchema.java | 30 ++++++++++++++++---- .../core/replication/ReplicationSchemaTest.java | 12 ++++++++ .../accumulo/master/replication/WorkDriver.java | 6 +++- 3 files changed, 41 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/49fc9855/core/src/main/java/org/apache/accumulo/core/replication/ReplicationSchema.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/replication/ReplicationSchema.java b/core/src/main/java/org/apache/accumulo/core/replication/ReplicationSchema.java index 8699bd2..ab350e6 100644 --- a/core/src/main/java/org/apache/accumulo/core/replication/ReplicationSchema.java +++ b/core/src/main/java/org/apache/accumulo/core/replication/ReplicationSchema.java @@ -153,11 +153,11 @@ public class ReplicationSchema { * Holds the order in which files needed for replication were closed. The intent is to be able to guarantee that files which were closed earlier were * replicated first and we don't replay data in the wrong order on our peers * <p> - * <code>encodedTimeOfClosure_hdfs://localhost:8020/accumulo/wal/tserver+port/WAL order:source_table_id [] -> Status Protobuf</code> + * <code>encodedTimeOfClosure\x00hdfs://localhost:8020/accumulo/wal/tserver+port/WAL order:source_table_id [] -> Status Protobuf</code> */ public static class OrderSection { public static final Text NAME = new Text("order"); - public static final String ROW_SEPARATOR = "_"; + public static final Text ROW_SEPARATOR = new Text(new byte[]{0}); private static final ULongLexicoder longEncoder = new ULongLexicoder(); /** @@ -218,10 +218,10 @@ public class ReplicationSchema { Path p = new Path(file); String pathString = p.toUri().toString(); - log.info("Normalized {} into {}", file, pathString); + log.trace("Normalized {} into {}", file, pathString); // Append the file as a suffix to the row - row.append((ROW_SEPARATOR + pathString).getBytes(), 0, pathString.length() + ROW_SEPARATOR.length()); + row.append((ROW_SEPARATOR + pathString).getBytes(), 0, pathString.length() + ROW_SEPARATOR.getLength()); // Make the mutation and add the column update return new Mutation(row); @@ -249,7 +249,16 @@ public class ReplicationSchema { public static long getTimeClosed(Key k, Text buff) { k.getRow(buff); - int offset = buff.find(ROW_SEPARATOR); + int offset = 0; + // find the last offset + while (true) { + int nextOffset = buff.find(ROW_SEPARATOR.toString(), offset + 1); + if (-1 == nextOffset) { + break; + } + offset = nextOffset; + } + if (-1 == offset) { throw new IllegalArgumentException("Row does not contain expected separator for OrderSection"); } @@ -266,7 +275,16 @@ public class ReplicationSchema { public static String getFile(Key k, Text buff) { k.getRow(buff); - int offset = buff.find(ROW_SEPARATOR); + int offset = 0; + // find the last offset + while (true) { + int nextOffset = buff.find(ROW_SEPARATOR.toString(), offset + 1); + if (-1 == nextOffset) { + break; + } + offset = nextOffset; + } + if (-1 == offset) { throw new IllegalArgumentException("Row does not contain expected separator for OrderSection"); } http://git-wip-us.apache.org/repos/asf/accumulo/blob/49fc9855/core/src/test/java/org/apache/accumulo/core/replication/ReplicationSchemaTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/accumulo/core/replication/ReplicationSchemaTest.java b/core/src/test/java/org/apache/accumulo/core/replication/ReplicationSchemaTest.java index d321153..3822641 100644 --- a/core/src/test/java/org/apache/accumulo/core/replication/ReplicationSchemaTest.java +++ b/core/src/test/java/org/apache/accumulo/core/replication/ReplicationSchemaTest.java @@ -119,4 +119,16 @@ public class ReplicationSchemaTest { Assert.assertEquals("/accumulo/file", OrderSection.getFile(k, buff)); Assert.assertEquals(now, OrderSection.getTimeClosed(k, buff)); } + + @Test + public void separatorDoesntInterferWithSplit() { + Text buff = new Text(); + // Cycle through 2*128 values + for (long i = 1; i < 258; i++) { + Mutation m = OrderSection.createMutation("/accumulo/file", i); + Key k = new Key(new Text(m.getRow())); + Assert.assertEquals("/accumulo/file", OrderSection.getFile(k, buff)); + Assert.assertEquals(i, OrderSection.getTimeClosed(k, buff)); + } + } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/49fc9855/server/master/src/main/java/org/apache/accumulo/master/replication/WorkDriver.java ---------------------------------------------------------------------- diff --git a/server/master/src/main/java/org/apache/accumulo/master/replication/WorkDriver.java b/server/master/src/main/java/org/apache/accumulo/master/replication/WorkDriver.java index 8c3e3e3..43a74a8 100644 --- a/server/master/src/main/java/org/apache/accumulo/master/replication/WorkDriver.java +++ b/server/master/src/main/java/org/apache/accumulo/master/replication/WorkDriver.java @@ -93,7 +93,11 @@ public class WorkDriver extends Daemon { while (master.stillMaster()) { // Assign the work using the configured implementation - assigner.assignWork(); + try { + assigner.assignWork(); + } catch (Exception e) { + log.error("Error while assigning work", e); + } long sleepTime = conf.getTimeInMillis(Property.REPLICATION_WORK_ASSIGNMENT_SLEEP); log.debug("Sleeping {} ms before next work assignment", sleepTime);