ACCUMULO-4533 TraceServer shouldn't abort given problems with trace table checks.
Signed-off-by: Mike Drob <md...@cloudera.com> Signed-off-by: Josh Elser <els...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/47b57f73 Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/47b57f73 Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/47b57f73 Branch: refs/heads/1.8 Commit: 47b57f7300ae5d1df9536cf07d99c83bfb8e2af6 Parents: 3c45441 Author: Sean Busbey <bus...@cloudera.com> Authored: Wed Dec 7 10:45:51 2016 -0600 Committer: Sean Busbey <bus...@cloudera.com> Committed: Fri Dec 9 15:21:54 2016 -0600 ---------------------------------------------------------------------- .../org/apache/accumulo/tracer/TraceServer.java | 50 ++++++++++++++------ 1 file changed, 35 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/47b57f73/server/tracer/src/main/java/org/apache/accumulo/tracer/TraceServer.java ---------------------------------------------------------------------- diff --git a/server/tracer/src/main/java/org/apache/accumulo/tracer/TraceServer.java b/server/tracer/src/main/java/org/apache/accumulo/tracer/TraceServer.java index dbb593d..67bd9d5 100644 --- a/server/tracer/src/main/java/org/apache/accumulo/tracer/TraceServer.java +++ b/server/tracer/src/main/java/org/apache/accumulo/tracer/TraceServer.java @@ -28,12 +28,16 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.client.BatchWriter; import org.apache.accumulo.core.client.BatchWriterConfig; import org.apache.accumulo.core.client.Connector; import org.apache.accumulo.core.client.Instance; import org.apache.accumulo.core.client.IteratorSetting; import org.apache.accumulo.core.client.MutationsRejectedException; +import org.apache.accumulo.core.client.TableExistsException; +import org.apache.accumulo.core.client.TableNotFoundException; import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; import org.apache.accumulo.core.client.security.tokens.AuthenticationToken.Properties; import org.apache.accumulo.core.client.security.tokens.KerberosToken; @@ -182,6 +186,35 @@ public class TraceServer implements Watcher { log.info("Instance " + serverConfiguration.getInstance().getInstanceID()); AccumuloConfiguration conf = serverConfiguration.getConfiguration(); table = conf.get(Property.TRACE_TABLE); + connector = ensureTraceTableExists(conf); + + int port = conf.getPort(Property.TRACE_PORT); + final ServerSocket sock = ServerSocketChannel.open().socket(); + sock.setReuseAddress(true); + sock.bind(new InetSocketAddress(hostname, port)); + final TServerTransport transport = new TServerSocket(sock); + TThreadPoolServer.Args options = new TThreadPoolServer.Args(transport); + options.processor(new Processor<Iface>(new Receiver())); + server = new TThreadPoolServer(options); + registerInZooKeeper(sock.getInetAddress().getHostAddress() + ":" + sock.getLocalPort(), conf.get(Property.TRACE_ZK_PATH)); + writer = new AtomicReference<>(this.connector.createBatchWriter(table, new BatchWriterConfig().setMaxLatency(BATCH_WRITER_MAX_LATENCY, TimeUnit.SECONDS))); + } + + /** + * Exceptions thrown out of here should be things that cause service failure (e.g. misconfigurations that aren't likely to change on retry). + * + * @return a working Connection that can be reused + * @throws ClassNotFoundException + * if TRACE_TOKEN_TYPE is set to a class that we can't load. + * @throws InstantiationException + * if we fail to create an instance of TRACE_TOKEN_TYPE. + * @throws IllegalAccessException + * if the class pointed to by TRACE_TOKEN_TYPE is private. + * @throws AccumuloSecurityException + * if the trace user has the wrong permissions + */ + private Connector ensureTraceTableExists(final AccumuloConfiguration conf) throws AccumuloSecurityException, ClassNotFoundException, InstantiationException, + IllegalAccessException { Connector connector = null; while (true) { try { @@ -221,25 +254,12 @@ public class TraceServer implements Watcher { } connector.tableOperations().setProperty(table, Property.TABLE_FORMATTER_CLASS.getKey(), TraceFormatter.class.getName()); break; - } catch (RuntimeException ex) { + } catch (AccumuloException | TableExistsException | TableNotFoundException | IOException | RuntimeException ex) { log.info("Waiting to checking/create the trace table.", ex); UtilWaitThread.sleep(1000); } } - this.connector = connector; - // make sure we refer to the final variable from now on. - connector = null; - - int port = conf.getPort(Property.TRACE_PORT); - final ServerSocket sock = ServerSocketChannel.open().socket(); - sock.setReuseAddress(true); - sock.bind(new InetSocketAddress(hostname, port)); - final TServerTransport transport = new TServerSocket(sock); - TThreadPoolServer.Args options = new TThreadPoolServer.Args(transport); - options.processor(new Processor<Iface>(new Receiver())); - server = new TThreadPoolServer(options); - registerInZooKeeper(sock.getInetAddress().getHostAddress() + ":" + sock.getLocalPort(), conf.get(Property.TRACE_ZK_PATH)); - writer = new AtomicReference<>(this.connector.createBatchWriter(table, new BatchWriterConfig().setMaxLatency(BATCH_WRITER_MAX_LATENCY, TimeUnit.SECONDS))); + return connector; } public void run() throws Exception {