ACCUMULO-2833 Add a StatusFormatter and configure it on metadata and replication tables
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/e798d500 Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/e798d500 Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/e798d500 Branch: refs/heads/ACCUMULO-378 Commit: e798d50088933cbed7761b721a868d5f3da62935 Parents: 5917723 Author: Josh Elser <els...@apache.org> Authored: Thu May 22 13:39:08 2014 -0400 Committer: Josh Elser <els...@apache.org> Committed: Thu May 22 13:39:08 2014 -0400 ---------------------------------------------------------------------- .../accumulo/core/protobuf/ProtobufUtil.java | 3 +- .../core/replication/StatusFormatter.java | 187 +++++++++++++++++++ .../server/replication/ReplicationTable.java | 47 ++++- .../server/util/ReplicationTableUtil.java | 48 ++++- .../server/util/ReplicationTableUtilTest.java | 6 + .../DistributedWorkQueueWorkAssigner.java | 2 +- .../replication/SequentialWorkAssigner.java | 2 +- .../replication/ReplicationProcessor.java | 6 +- 8 files changed, 287 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/e798d500/core/src/main/java/org/apache/accumulo/core/protobuf/ProtobufUtil.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/protobuf/ProtobufUtil.java b/core/src/main/java/org/apache/accumulo/core/protobuf/ProtobufUtil.java index 30a0ac2..60eb840 100644 --- a/core/src/main/java/org/apache/accumulo/core/protobuf/ProtobufUtil.java +++ b/core/src/main/java/org/apache/accumulo/core/protobuf/ProtobufUtil.java @@ -25,6 +25,7 @@ import com.google.protobuf.TextFormat; * Helper methods for interacting with Protocol Buffers and Accumulo */ public class ProtobufUtil { + private static final char LEFT_BRACKET = '[', RIGHT_BRACKET = ']'; public static Value toValue(GeneratedMessage msg) { return new Value(msg.toByteArray()); @@ -32,6 +33,6 @@ public class ProtobufUtil { public static String toString(GeneratedMessage msg) { // Too much typing - return TextFormat.shortDebugString(msg); + return LEFT_BRACKET + TextFormat.shortDebugString(msg) + RIGHT_BRACKET; } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/e798d500/core/src/main/java/org/apache/accumulo/core/replication/StatusFormatter.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/replication/StatusFormatter.java b/core/src/main/java/org/apache/accumulo/core/replication/StatusFormatter.java new file mode 100644 index 0000000..bc04480 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/replication/StatusFormatter.java @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.replication; + +import java.text.DateFormat; +import java.text.FieldPosition; +import java.text.ParsePosition; +import java.util.Collections; +import java.util.Date; +import java.util.Iterator; +import java.util.Map.Entry; +import java.util.Set; + +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.ReplicationSection; +import org.apache.accumulo.core.protobuf.ProtobufUtil; +import org.apache.accumulo.core.replication.ReplicationSchema.OrderSection; +import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection; +import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection; +import org.apache.accumulo.core.replication.proto.Replication.Status; +import org.apache.accumulo.core.security.ColumnVisibility; +import org.apache.accumulo.core.util.format.DefaultFormatter; +import org.apache.accumulo.core.util.format.Formatter; +import org.apache.hadoop.io.Text; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.Sets; +import com.google.protobuf.InvalidProtocolBufferException; + +/** + * Parse and print the serialized protocol buffers used to track replication data + */ +public class StatusFormatter implements Formatter { + private static final Logger log = LoggerFactory.getLogger(StatusFormatter.class); + + private static final Set<Text> REPLICATION_COLFAMS = Collections.unmodifiableSet(Sets.newHashSet(ReplicationSection.COLF, StatusSection.NAME, + WorkSection.NAME, OrderSection.NAME)); + + private Iterator<Entry<Key,Value>> iterator; + private boolean printTimestamps; + + /* so a new date object doesn't get created for every record in the scan result */ + private static ThreadLocal<Date> tmpDate = new ThreadLocal<Date>() { + @Override + protected Date initialValue() { + return new Date(); + } + }; + + private static final ThreadLocal<DateFormat> formatter = new ThreadLocal<DateFormat>() { + @Override + protected DateFormat initialValue() { + return new DefaultDateFormat(); + } + + class DefaultDateFormat extends DateFormat { + private static final long serialVersionUID = 1L; + + @Override + public StringBuffer format(Date date, StringBuffer toAppendTo, FieldPosition fieldPosition) { + toAppendTo.append(Long.toString(date.getTime())); + return toAppendTo; + } + + @Override + public Date parse(String source, ParsePosition pos) { + return new Date(Long.parseLong(source)); + } + + } + }; + + @Override + public boolean hasNext() { + return iterator.hasNext(); + } + + @Override + public String next() { + Entry<Key,Value> entry = iterator.next(); + DateFormat timestampFormat = printTimestamps ? formatter.get() : null; + + // If we expected this to be a protobuf, try to parse it, adding a message when it fails to parse + if (REPLICATION_COLFAMS.contains(entry.getKey().getColumnFamily())) { + Status status; + try { + status = Status.parseFrom(entry.getValue().get()); + } catch (InvalidProtocolBufferException e) { + log.trace("Could not deserialize protocol buffer for {}", entry.getKey(), e); + status = null; + } + + return formatEntry(entry.getKey(), status, timestampFormat); + } else { + // Otherwise, we're set on a table that contains other data too (e.g. accumulo.metadata) + // Just do the normal thing + return DefaultFormatter.formatEntry(entry, timestampFormat); + } + } + + public String formatEntry(Key key, Status status, DateFormat timestampFormat) { + StringBuilder sb = new StringBuilder(); + Text buffer = new Text(); + + // append row + key.getRow(buffer); + appendText(sb, buffer).append(" "); + + // append column family + key.getColumnFamily(buffer); + appendText(sb, buffer).append(":"); + + // append column qualifier + key.getColumnQualifier(buffer); + appendText(sb, buffer).append(" "); + + // append visibility expression + key.getColumnVisibility(buffer); + sb.append(new ColumnVisibility(buffer)); + + // append timestamp + if (timestampFormat != null) { + tmpDate.get().setTime(key.getTimestamp()); + sb.append(" ").append(timestampFormat.format(tmpDate.get())); + } + + sb.append("\t"); + // append value + if (status != null) { + sb.append(ProtobufUtil.toString(status)); + } else { + sb.append("Could not deserialize Status protocol buffer"); + } + + return sb.toString(); + } + + protected StringBuilder appendText(StringBuilder sb, Text t) { + return appendBytes(sb, t.getBytes(), 0, t.getLength()); + } + + protected String getValue(Value v) { + StringBuilder sb = new StringBuilder(); + return appendBytes(sb, v.get(), 0, v.get().length).toString(); + } + + protected StringBuilder appendBytes(StringBuilder sb, byte ba[], int offset, int len) { + for (int i = 0; i < len; i++) { + int c = 0xff & ba[offset + i]; + if (c == '\\') + sb.append("\\\\"); + else if (c >= 32 && c <= 126) + sb.append((char) c); + else + sb.append("\\x").append(String.format("%02X", c)); + } + return sb; + } + + @Override + public void remove() { + iterator.remove(); + } + + @Override + public void initialize(Iterable<Entry<Key,Value>> scanner, boolean printTimestamps) { + this.iterator = scanner.iterator(); + this.printTimestamps = printTimestamps; + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/e798d500/server/base/src/main/java/org/apache/accumulo/server/replication/ReplicationTable.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/replication/ReplicationTable.java b/server/base/src/main/java/org/apache/accumulo/server/replication/ReplicationTable.java index 622ec10..68651ab 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/replication/ReplicationTable.java +++ b/server/base/src/main/java/org/apache/accumulo/server/replication/ReplicationTable.java @@ -20,6 +20,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.EnumSet; import java.util.Map; +import java.util.Map.Entry; import java.util.Set; import org.apache.accumulo.core.client.AccumuloException; @@ -34,20 +35,23 @@ import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.client.TableExistsException; import org.apache.accumulo.core.client.TableNotFoundException; import org.apache.accumulo.core.client.admin.TableOperations; +import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.iterators.Combiner; import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope; import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection; import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection; +import org.apache.accumulo.core.replication.StatusFormatter; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.security.TablePermission; import org.apache.accumulo.fate.util.UtilWaitThread; import org.apache.hadoop.io.Text; -import org.apache.log4j.Logger; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.google.common.collect.ImmutableMap; public class ReplicationTable extends org.apache.accumulo.core.client.replication.ReplicationTable { - private static final Logger log = Logger.getLogger(ReplicationTable.class); + private static final Logger log = LoggerFactory.getLogger(ReplicationTable.class); public static final String COMBINER_NAME = "statuscombiner"; @@ -56,6 +60,7 @@ public class ReplicationTable extends org.apache.accumulo.core.client.replicatio public static final String WORK_LG_NAME = WorkSection.NAME.toString(); public static final Set<Text> WORK_LG_COLFAMS = Collections.singleton(WorkSection.NAME); public static final Map<String,Set<Text>> LOCALITY_GROUPS = ImmutableMap.of(STATUS_LG_NAME, STATUS_LG_COLFAMS, WORK_LG_NAME, WORK_LG_COLFAMS); + public static final String STATUS_FORMATTER_CLASS_NAME = StatusFormatter.class.getName(); public static synchronized void create(Connector conn) { TableOperations tops = conn.tableOperations(); @@ -146,6 +151,44 @@ public class ReplicationTable extends org.apache.accumulo.core.client.replicatio } } + // Make sure the StatusFormatter is set on the metadata table + Iterable<Entry<String,String>> properties; + try { + properties = tops.getProperties(NAME); + } catch (AccumuloException | TableNotFoundException e) { + log.error("Could not fetch table properties on replication table", e); + return false; + } + + boolean formatterConfigured = false; + for (Entry<String,String> property : properties) { + if (Property.TABLE_FORMATTER_CLASS.getKey().equals(property.getKey())) { + if (!STATUS_FORMATTER_CLASS_NAME.equals(property.getValue())) { + log.info("Setting formatter for {} from {} to {}", NAME, property.getValue(), STATUS_FORMATTER_CLASS_NAME); + try { + tops.setProperty(NAME, Property.TABLE_FORMATTER_CLASS.getKey(), STATUS_FORMATTER_CLASS_NAME); + } catch (AccumuloException | AccumuloSecurityException e) { + log.error("Could not set formatter on replication table", e); + return false; + } + } + + formatterConfigured = true; + + // Don't need to keep iterating over the properties after we found the one we were looking for + break; + } + } + + if (!formatterConfigured) { + try { + tops.setProperty(NAME, Property.TABLE_FORMATTER_CLASS.getKey(), STATUS_FORMATTER_CLASS_NAME); + } catch (AccumuloException | AccumuloSecurityException e) { + log.error("Could not set formatter on replication table", e); + return false; + } + } + log.debug("Successfully configured replication table"); return true; http://git-wip-us.apache.org/repos/asf/accumulo/blob/e798d500/server/base/src/main/java/org/apache/accumulo/server/util/ReplicationTableUtil.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/ReplicationTableUtil.java b/server/base/src/main/java/org/apache/accumulo/server/util/ReplicationTableUtil.java index 45f8fea..2a9774d 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/util/ReplicationTableUtil.java +++ b/server/base/src/main/java/org/apache/accumulo/server/util/ReplicationTableUtil.java @@ -21,6 +21,7 @@ import java.util.Collections; import java.util.EnumSet; import java.util.HashMap; import java.util.Map; +import java.util.Map.Entry; import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; @@ -31,6 +32,7 @@ import org.apache.accumulo.core.client.IteratorSetting.Column; import org.apache.accumulo.core.client.TableNotFoundException; import org.apache.accumulo.core.client.admin.TableOperations; import org.apache.accumulo.core.client.impl.Writer; +import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.KeyExtent; import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.data.Value; @@ -40,6 +42,7 @@ import org.apache.accumulo.core.metadata.MetadataTable; import org.apache.accumulo.core.metadata.schema.MetadataSchema; import org.apache.accumulo.core.metadata.schema.MetadataSchema.ReplicationSection; import org.apache.accumulo.core.protobuf.ProtobufUtil; +import org.apache.accumulo.core.replication.StatusFormatter; import org.apache.accumulo.core.replication.proto.Replication.Status; import org.apache.accumulo.core.security.Credentials; import org.apache.accumulo.core.tabletserver.log.LogEntry; @@ -49,7 +52,8 @@ import org.apache.accumulo.server.client.HdfsZooInstance; import org.apache.accumulo.server.replication.StatusCombiner; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; -import org.apache.log4j.Logger; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * provides a reference to the replication table for updates by tablet servers @@ -57,9 +61,10 @@ import org.apache.log4j.Logger; public class ReplicationTableUtil { private static Map<Credentials,Writer> writers = new HashMap<Credentials,Writer>(); - private static final Logger log = Logger.getLogger(ReplicationTableUtil.class); + private static final Logger log = LoggerFactory.getLogger(ReplicationTableUtil.class); public static final String COMBINER_NAME = "replcombiner"; + public static final String STATUS_FORMATTER_CLASS_NAME = StatusFormatter.class.getName(); private ReplicationTableUtil() {} @@ -116,6 +121,37 @@ public class ReplicationTableUtil { throw new RuntimeException(e); } } + + // Make sure the StatusFormatter is set on the metadata table + Iterable<Entry<String,String>> properties; + try { + properties = tops.getProperties(tableName); + } catch (AccumuloException | TableNotFoundException e) { + throw new RuntimeException(e); + } + + for (Entry<String,String> property : properties) { + if (Property.TABLE_FORMATTER_CLASS.getKey().equals(property.getKey())) { + if (!STATUS_FORMATTER_CLASS_NAME.equals(property.getValue())) { + log.info("Setting formatter for {} from {} to {}", tableName, property.getValue(), STATUS_FORMATTER_CLASS_NAME); + try { + tops.setProperty(tableName, Property.TABLE_FORMATTER_CLASS.getKey(), STATUS_FORMATTER_CLASS_NAME); + } catch (AccumuloException | AccumuloSecurityException e) { + throw new RuntimeException(e); + } + } + + // Don't need to keep iterating over the properties after we found the one we were looking for + return; + } + } + + // Set the formatter on the table because it wasn't already there + try { + tops.setProperty(tableName, Property.TABLE_FORMATTER_CLASS.getKey(), STATUS_FORMATTER_CLASS_NAME); + } catch (AccumuloException | AccumuloSecurityException e) { + throw new RuntimeException(e); + } } /** @@ -128,13 +164,13 @@ public class ReplicationTableUtil { t.update(m); return; } catch (AccumuloException e) { - log.error(e, e); + log.error(e.toString(), e); } catch (AccumuloSecurityException e) { - log.error(e, e); + log.error(e.toString(), e); } catch (ConstraintViolationException e) { - log.error(e, e); + log.error(e.toString(), e); } catch (TableNotFoundException e) { - log.error(e, e); + log.error(e.toString(), e); } UtilWaitThread.sleep(1000); } http://git-wip-us.apache.org/repos/asf/accumulo/blob/e798d500/server/base/src/test/java/org/apache/accumulo/server/util/ReplicationTableUtilTest.java ---------------------------------------------------------------------- diff --git a/server/base/src/test/java/org/apache/accumulo/server/util/ReplicationTableUtilTest.java b/server/base/src/test/java/org/apache/accumulo/server/util/ReplicationTableUtilTest.java index 12295ef..88f13c9 100644 --- a/server/base/src/test/java/org/apache/accumulo/server/util/ReplicationTableUtilTest.java +++ b/server/base/src/test/java/org/apache/accumulo/server/util/ReplicationTableUtilTest.java @@ -28,6 +28,7 @@ import java.util.EnumSet; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.UUID; import org.apache.accumulo.core.client.Connector; @@ -36,6 +37,7 @@ import org.apache.accumulo.core.client.IteratorSetting.Column; import org.apache.accumulo.core.client.admin.TableOperations; import org.apache.accumulo.core.client.impl.Writer; import org.apache.accumulo.core.client.security.tokens.PasswordToken; +import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.ColumnUpdate; import org.apache.accumulo.core.data.KeyExtent; import org.apache.accumulo.core.data.Mutation; @@ -142,6 +144,10 @@ public class ReplicationTableUtilTest { tops.attachIterator(myMetadataTable, combiner); expectLastCall().once(); + expect(tops.getProperties(myMetadataTable)).andReturn((Iterable<Entry<String,String>>) Collections.<Entry<String,String>> emptyList()); + tops.setProperty(myMetadataTable, Property.TABLE_FORMATTER_CLASS.getKey(), ReplicationTableUtil.STATUS_FORMATTER_CLASS_NAME); + expectLastCall().once(); + replay(conn, tops); ReplicationTableUtil.configureMetadataTable(conn, myMetadataTable); http://git-wip-us.apache.org/repos/asf/accumulo/blob/e798d500/server/master/src/main/java/org/apache/accumulo/master/replication/DistributedWorkQueueWorkAssigner.java ---------------------------------------------------------------------- diff --git a/server/master/src/main/java/org/apache/accumulo/master/replication/DistributedWorkQueueWorkAssigner.java b/server/master/src/main/java/org/apache/accumulo/master/replication/DistributedWorkQueueWorkAssigner.java index f04f3e8..4f883af 100644 --- a/server/master/src/main/java/org/apache/accumulo/master/replication/DistributedWorkQueueWorkAssigner.java +++ b/server/master/src/main/java/org/apache/accumulo/master/replication/DistributedWorkQueueWorkAssigner.java @@ -255,7 +255,7 @@ public class DistributedWorkQueueWorkAssigner extends AbstractWorkAssigner { log.trace("Not re-queueing work for {}", key); } } else { - log.debug("Not queueing work for {} because [{}] doesn't need replication", file, TextFormat.shortDebugString(status)); + log.debug("Not queueing work for {} because {} doesn't need replication", file, TextFormat.shortDebugString(status)); } } } finally { http://git-wip-us.apache.org/repos/asf/accumulo/blob/e798d500/server/master/src/main/java/org/apache/accumulo/master/replication/SequentialWorkAssigner.java ---------------------------------------------------------------------- diff --git a/server/master/src/main/java/org/apache/accumulo/master/replication/SequentialWorkAssigner.java b/server/master/src/main/java/org/apache/accumulo/master/replication/SequentialWorkAssigner.java index 7609ca5..f2d110a 100644 --- a/server/master/src/main/java/org/apache/accumulo/master/replication/SequentialWorkAssigner.java +++ b/server/master/src/main/java/org/apache/accumulo/master/replication/SequentialWorkAssigner.java @@ -297,7 +297,7 @@ public class SequentialWorkAssigner extends AbstractWorkAssigner { log.debug("Not queueing {} for work as {} must be replicated to {} first", file, keyBeingReplicated, target.getPeerName()); } } else { - log.debug("Not queueing work for {} because [{}] doesn't need replication", file, ProtobufUtil.toString(status)); + log.debug("Not queueing work for {} because {} doesn't need replication", file, ProtobufUtil.toString(status)); if (key.equals(keyBeingReplicated)) { log.debug("Removing {} from replication state to {} because replication is complete", keyBeingReplicated, target.getPeerName()); queuedWorkForPeer.remove(sourceTableId); http://git-wip-us.apache.org/repos/asf/accumulo/blob/e798d500/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationProcessor.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationProcessor.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationProcessor.java index 9fb7fe5..d451991 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationProcessor.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationProcessor.java @@ -123,7 +123,7 @@ public class ReplicationProcessor implements Processor { // Replicate that sucker Status replicatedStatus = replica.replicate(filePath, status, target); - log.debug("Completed replication of {} to {}, with new Status [{}]", filePath, target, ProtobufUtil.toString(replicatedStatus)); + log.debug("Completed replication of {} to {}, with new Status {}", filePath, target, ProtobufUtil.toString(replicatedStatus)); // If we got a different status if (!replicatedStatus.equals(status)) { @@ -132,7 +132,7 @@ public class ReplicationProcessor implements Processor { return; } - log.debug("Did not replicate any new data for {} to {}, (was [{}], now is [{}])", filePath, target, TextFormat.shortDebugString(status), + log.debug("Did not replicate any new data for {} to {}, (was {}, now is {})", filePath, target, TextFormat.shortDebugString(status), TextFormat.shortDebugString(replicatedStatus)); // otherwise, we didn't actually replicate because there was error sending the data @@ -175,7 +175,7 @@ public class ReplicationProcessor implements Processor { try { Connector conn = inst.getConnector(creds.getPrincipal(), creds.getToken()); BatchWriter bw = ReplicationTable.getBatchWriter(conn); - log.debug("Recording new status for {}, [{}]", filePath.toString(), TextFormat.shortDebugString(status)); + log.debug("Recording new status for {}, {}", filePath.toString(), TextFormat.shortDebugString(status)); Mutation m = new Mutation(filePath.toString()); WorkSection.add(m, target.toText(), ProtobufUtil.toValue(status)); bw.addMutation(m);