ACCUMULO-2209 reset the batch writer on any exception
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/75f27e5d Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/75f27e5d Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/75f27e5d Branch: refs/heads/master Commit: 75f27e5d17b2b667438b3ca4ecd59e76d0ec206f Parents: c42a511 99c4361 Author: Eric Newton <eric.new...@gmail.com> Authored: Fri Jan 17 10:23:37 2014 -0500 Committer: Eric Newton <eric.new...@gmail.com> Committed: Fri Jan 17 10:23:37 2014 -0500 ---------------------------------------------------------------------- .../main/java/org/apache/accumulo/server/trace/TraceServer.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/75f27e5d/server/src/main/java/org/apache/accumulo/server/trace/TraceServer.java ---------------------------------------------------------------------- diff --cc server/src/main/java/org/apache/accumulo/server/trace/TraceServer.java index 77cdb17,0000000..0777d03 mode 100644,000000..100644 --- a/server/src/main/java/org/apache/accumulo/server/trace/TraceServer.java +++ b/server/src/main/java/org/apache/accumulo/server/trace/TraceServer.java @@@ -1,293 -1,0 +1,292 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.server.trace; + +import java.net.InetSocketAddress; +import java.net.ServerSocket; +import java.nio.channels.ServerSocketChannel; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.TimeUnit; + +import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.BatchWriterConfig; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.core.client.IteratorSetting; - import org.apache.accumulo.core.client.MutationsRejectedException; +import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; +import org.apache.accumulo.core.client.security.tokens.AuthenticationToken.Properties; +import org.apache.accumulo.core.client.security.tokens.PasswordToken; +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.file.FileUtil; +import org.apache.accumulo.core.iterators.user.AgeOffFilter; +import org.apache.accumulo.core.security.SecurityUtil; +import org.apache.accumulo.core.trace.TraceFormatter; +import org.apache.accumulo.core.util.AddressUtil; +import org.apache.accumulo.core.util.CachedConfiguration; +import org.apache.accumulo.core.util.UtilWaitThread; +import org.apache.accumulo.core.zookeeper.ZooUtil; +import org.apache.accumulo.fate.zookeeper.IZooReaderWriter; +import org.apache.accumulo.server.Accumulo; +import org.apache.accumulo.server.client.HdfsZooInstance; +import org.apache.accumulo.server.conf.ServerConfiguration; +import org.apache.accumulo.server.util.time.SimpleTimer; +import org.apache.accumulo.server.zookeeper.ZooReaderWriter; +import org.apache.accumulo.start.classloader.AccumuloClassLoader; +import org.apache.accumulo.trace.instrument.Span; +import org.apache.accumulo.trace.thrift.RemoteSpan; +import org.apache.accumulo.trace.thrift.SpanReceiver.Iface; +import org.apache.accumulo.trace.thrift.SpanReceiver.Processor; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.io.Text; +import org.apache.log4j.Logger; +import org.apache.thrift.TByteArrayOutputStream; +import org.apache.thrift.TException; +import org.apache.thrift.protocol.TCompactProtocol; +import org.apache.thrift.server.TServer; +import org.apache.thrift.server.TThreadPoolServer; +import org.apache.thrift.transport.TServerSocket; +import org.apache.thrift.transport.TServerTransport; +import org.apache.thrift.transport.TTransport; +import org.apache.thrift.transport.TTransportException; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.Watcher.Event.EventType; +import org.apache.zookeeper.Watcher.Event.KeeperState; + +public class TraceServer implements Watcher { + + final private static Logger log = Logger.getLogger(TraceServer.class); + final private ServerConfiguration serverConfiguration; + final private TServer server; + private BatchWriter writer = null; + private Connector connector; + final String table; + + private static void put(Mutation m, String cf, String cq, byte[] bytes, int len) { + m.put(new Text(cf), new Text(cq), new Value(bytes, 0, len)); + } + + static class ByteArrayTransport extends TTransport { + TByteArrayOutputStream out = new TByteArrayOutputStream(); + + @Override + public boolean isOpen() { + return true; + } + + @Override + public void open() throws TTransportException {} + + @Override + public void close() {} + + @Override + public int read(byte[] buf, int off, int len) { + return 0; + } + + @Override + public void write(byte[] buf, int off, int len) throws TTransportException { + out.write(buf, off, len); + } + + public byte[] get() { + return out.get(); + } + + public int len() { + return out.len(); + } + } + + class Receiver implements Iface { + @Override + public void span(RemoteSpan s) throws TException { + String idString = Long.toHexString(s.traceId); + String startString = Long.toHexString(s.start); + Mutation spanMutation = new Mutation(new Text(idString)); + Mutation indexMutation = new Mutation(new Text("idx:" + s.svc + ":" + startString)); + long diff = s.stop - s.start; + indexMutation.put(new Text(s.description), new Text(s.sender), new Value((idString + ":" + Long.toHexString(diff)).getBytes())); + ByteArrayTransport transport = new ByteArrayTransport(); + TCompactProtocol protocol = new TCompactProtocol(transport); + s.write(protocol); + String parentString = Long.toHexString(s.parentId); + if (s.parentId == Span.ROOT_SPAN_ID) + parentString = ""; + put(spanMutation, "span", parentString + ":" + Long.toHexString(s.spanId), transport.get(), transport.len()); + // Map the root span to time so we can look up traces by time + Mutation timeMutation = null; + if (s.parentId == Span.ROOT_SPAN_ID) { + timeMutation = new Mutation(new Text("start:" + startString)); + put(timeMutation, "id", idString, transport.get(), transport.len()); + } + try { + if (writer == null) + resetWriter(); + if (writer == null) + return; + writer.addMutation(spanMutation); + writer.addMutation(indexMutation); + if (timeMutation != null) + writer.addMutation(timeMutation); + } catch (Exception ex) { + log.error("Unable to write mutation to table: " + spanMutation, ex); + } + } + + } + + public TraceServer(ServerConfiguration serverConfiguration, String hostname) throws Exception { + this.serverConfiguration = serverConfiguration; + AccumuloConfiguration conf = serverConfiguration.getConfiguration(); + table = conf.get(Property.TRACE_TABLE); + while (true) { + try { + String principal = conf.get(Property.TRACE_USER); + AuthenticationToken at; + Map<String,String> loginMap = conf.getAllPropertiesWithPrefix(Property.TRACE_TOKEN_PROPERTY_PREFIX); + if (loginMap.isEmpty()) { + Property p = Property.TRACE_PASSWORD; + at = new PasswordToken(conf.get(p).getBytes()); + } else { + Properties props = new Properties(); + AuthenticationToken token = AccumuloClassLoader.getClassLoader().loadClass(conf.get(Property.TRACE_TOKEN_TYPE)).asSubclass(AuthenticationToken.class) + .newInstance(); + + int prefixLength = Property.TRACE_TOKEN_PROPERTY_PREFIX.getKey().length() + 1; + for (Entry<String,String> entry : loginMap.entrySet()) { + props.put(entry.getKey().substring(prefixLength), entry.getValue()); + } + + token.init(props); + + at = token; + } + + connector = serverConfiguration.getInstance().getConnector(principal, at); + if (!connector.tableOperations().exists(table)) { + connector.tableOperations().create(table); + IteratorSetting setting = new IteratorSetting(10, "ageoff", AgeOffFilter.class.getName()); + AgeOffFilter.setTTL(setting, 7 * 24 * 60 * 60 * 1000l); + connector.tableOperations().attachIterator(table, setting); + } + connector.tableOperations().setProperty(table, Property.TABLE_FORMATTER_CLASS.getKey(), TraceFormatter.class.getName()); + break; + } catch (Exception ex) { + log.info("Waiting to checking/create the trace table.", ex); + UtilWaitThread.sleep(1000); + } + } + + int port = conf.getPort(Property.TRACE_PORT); + final ServerSocket sock = ServerSocketChannel.open().socket(); + sock.setReuseAddress(true); + sock.bind(new InetSocketAddress(port)); + final TServerTransport transport = new TServerSocket(sock); + TThreadPoolServer.Args options = new TThreadPoolServer.Args(transport); + options.processor(new Processor<Iface>(new Receiver())); + server = new TThreadPoolServer(options); + final InetSocketAddress address = new InetSocketAddress(hostname, sock.getLocalPort()); + registerInZooKeeper(AddressUtil.toString(address)); + + writer = connector.createBatchWriter(table, new BatchWriterConfig().setMaxLatency(5, TimeUnit.SECONDS)); + } + + public void run() throws Exception { + SimpleTimer.getInstance().schedule(new Runnable() { + @Override + public void run() { + flush(); + } + }, 1000, 1000); + server.serve(); + } + + private void flush() { + try { + writer.flush(); - } catch (MutationsRejectedException e) { ++ } catch (Exception e) { + log.error("Error flushing traces", e); + resetWriter(); + } + } + + synchronized private void resetWriter() { + try { + if (writer != null) + writer.close(); + } catch (Exception ex) { + log.error("Error closing batch writer", ex); + } finally { + writer = null; + try { + writer = connector.createBatchWriter(table, new BatchWriterConfig()); + } catch (Exception ex) { + log.error("Unable to create a batch writer: " + ex); + } + } + } + + private void registerInZooKeeper(String name) throws Exception { + String root = ZooUtil.getRoot(serverConfiguration.getInstance()) + Constants.ZTRACERS; + IZooReaderWriter zoo = ZooReaderWriter.getInstance(); + String path = zoo.putEphemeralSequential(root + "/trace-", name.getBytes()); + zoo.exists(path, this); + } + + public static void main(String[] args) throws Exception { + SecurityUtil.serverLogin(); + Instance instance = HdfsZooInstance.getInstance(); + ServerConfiguration conf = new ServerConfiguration(instance); + FileSystem fs = FileUtil.getFileSystem(CachedConfiguration.getInstance(), conf.getConfiguration()); + Accumulo.init(fs, conf, "tracer"); + String hostname = Accumulo.getLocalAddress(args); + TraceServer server = new TraceServer(conf, hostname); + Accumulo.enableTracing(hostname, "tserver"); + server.run(); + log.info("tracer stopping"); + } + + @Override + public void process(WatchedEvent event) { + log.debug("event " + event.getPath() + " " + event.getType() + " " + event.getState()); + if (event.getState() == KeeperState.Expired) { + log.warn("Trace server lost zookeeper registration at " + event.getPath()); + server.stop(); + } else if (event.getType() == EventType.NodeDeleted) { + log.warn("Trace server zookeeper entry lost " + event.getPath()); + server.stop(); + } + if (event.getPath() != null) { + try { + if (ZooReaderWriter.getInstance().exists(event.getPath(), this)) + return; + } catch (Exception ex) { + log.error(ex, ex); + } + log.warn("Trace server unable to reset watch on zookeeper registration"); + server.stop(); + } + } + +}