Repository: accumulo Updated Branches: refs/heads/1.7 07672bdc3 -> ef902f276 refs/heads/master ca5928e20 -> 342b6b302
ACCUMULO-3850 Fix some log levels and add more logging to replica system Try to add some more logging which will help debugging replication when it doesn't work as expected. Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/ef902f27 Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/ef902f27 Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/ef902f27 Branch: refs/heads/1.7 Commit: ef902f27658f3bbffe3eb654d96d8acf7e6a72cc Parents: 07672bd Author: Josh Elser <josh.el...@gmail.com> Authored: Tue May 26 14:27:25 2015 -0400 Committer: Josh Elser <josh.el...@gmail.com> Committed: Tue May 26 17:07:25 2015 -0400 ---------------------------------------------------------------------- .../tserver/replication/AccumuloReplicaSystem.java | 16 ++++++++++------ .../tserver/replication/ReplicationProcessor.java | 4 +++- 2 files changed, 13 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/ef902f27/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java index be68210..c28f341 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java @@ -362,7 +362,7 @@ public class AccumuloReplicaSystem implements ReplicaSystem { final long sizeLimit, final String remoteTableId, final TCredentials tcreds, final ReplicaSystemHelper helper, final UserGroupInformation accumuloUgi) throws TTransportException, AccumuloException, AccumuloSecurityException { - log.info("Replication WAL to peer tserver"); + log.debug("Replication WAL to peer tserver"); final Set<Integer> tids; final DataInputStream input; Span span = Trace.start("Read WAL header"); @@ -396,7 +396,7 @@ public class AccumuloReplicaSystem implements ReplicaSystem { span.stop(); } - log.info("Skipping unwanted data in WAL"); + log.debug("Skipping unwanted data in WAL"); span = Trace.start("Consume WAL prefix"); span.data("file", p.toString()); try { @@ -410,7 +410,7 @@ public class AccumuloReplicaSystem implements ReplicaSystem { span.stop(); } - log.info("Sending batches of data to peer tserver"); + log.debug("Sending batches of data to peer tserver"); Status lastStatus = status, currentStatus = status; final AtomicReference<Exception> exceptionRef = new AtomicReference<>(); @@ -484,6 +484,8 @@ public class AccumuloReplicaSystem implements ReplicaSystem { span.stop(); } + log.debug("Recorded updated status for {}: {}", p, currentStatus); + // If we don't have any more work, just quit if (!StatusUtil.isWorkRequired(currentStatus)) { return currentStatus; @@ -528,15 +530,17 @@ public class AccumuloReplicaSystem implements ReplicaSystem { public ReplicationStats execute(Client client) throws Exception { WalReplication edits = getWalEdits(target, input, p, status, sizeLimit, tids); - log.debug("Read {} WAL entries and retained {} bytes of WAL entries for replication to peer '{}'", (Long.MAX_VALUE == edits.entriesConsumed) ? "all" - : edits.entriesConsumed, edits.sizeInBytes, p); + log.debug("Read {} WAL entries and retained {} bytes of WAL entries for replication to peer '{}'", + (Long.MAX_VALUE == edits.entriesConsumed) ? "all remaining" : edits.entriesConsumed, edits.sizeInBytes, p); // If we have some edits to send if (0 < edits.walEdits.getEditsSize()) { - log.info("Sending {} edits", edits.walEdits.getEditsSize()); + log.debug("Sending {} edits", edits.walEdits.getEditsSize()); long entriesReplicated = client.replicateLog(remoteTableId, edits.walEdits, tcreds); if (entriesReplicated != edits.numUpdates) { log.warn("Sent {} WAL entries for replication but {} were reported as replicated", edits.numUpdates, entriesReplicated); + } else { + log.debug("Replicated {} edits", entriesReplicated); } // We don't have to replicate every LogEvent in the file (only Mutation LogEvents), but we http://git-wip-us.apache.org/repos/asf/accumulo/blob/ef902f27/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationProcessor.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationProcessor.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationProcessor.java index 8b825f8..1cd6131 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationProcessor.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationProcessor.java @@ -131,7 +131,9 @@ public class ReplicationProcessor implements Processor { log.debug("Replicating {} to {} using {}", filePath, target, replica.getClass().getName()); - replica.replicate(filePath, status, target, getHelper()); + Status newStatus = replica.replicate(filePath, status, target, getHelper()); + + log.debug("Finished replicating {}. Original status: {}, New status: {}", filePath, status, newStatus); } protected ReplicaSystemHelper getHelper() {