Merge branch '1.7' into 1.8 Conflicts: server/tracer/src/main/java/org/apache/accumulo/tracer/TraceServer.java
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/037c1384 Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/037c1384 Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/037c1384 Branch: refs/heads/master Commit: 037c1384ac861be95d8c9d1c5d1acbf61e40e2fe Parents: 7c7bbab 47b57f7 Author: Sean Busbey <bus...@cloudera.com> Authored: Fri Dec 9 15:33:37 2016 -0600 Committer: Sean Busbey <bus...@cloudera.com> Committed: Fri Dec 9 15:33:37 2016 -0600 ---------------------------------------------------------------------- .../org/apache/accumulo/tracer/TraceServer.java | 76 ++++++++++++-------- 1 file changed, 48 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/037c1384/server/tracer/src/main/java/org/apache/accumulo/tracer/TraceServer.java ---------------------------------------------------------------------- diff --cc server/tracer/src/main/java/org/apache/accumulo/tracer/TraceServer.java index 2b6bcbf,67bd9d5..7c0d9b2 --- a/server/tracer/src/main/java/org/apache/accumulo/tracer/TraceServer.java +++ b/server/tracer/src/main/java/org/apache/accumulo/tracer/TraceServer.java @@@ -183,7 -185,36 +187,49 @@@ public class TraceServer implements Wat log.info("Version " + Constants.VERSION); log.info("Instance " + serverConfiguration.getInstance().getInstanceID()); AccumuloConfiguration conf = serverConfiguration.getConfiguration(); - table = conf.get(Property.TRACE_TABLE); + tableName = conf.get(Property.TRACE_TABLE); + connector = ensureTraceTableExists(conf); + - int port = conf.getPort(Property.TRACE_PORT); - final ServerSocket sock = ServerSocketChannel.open().socket(); - sock.setReuseAddress(true); - sock.bind(new InetSocketAddress(hostname, port)); ++ int ports[] = conf.getPort(Property.TRACE_PORT); ++ ServerSocket sock = null; ++ for (int port : ports) { ++ ServerSocket s = ServerSocketChannel.open().socket(); ++ s.setReuseAddress(true); ++ try { ++ s.bind(new InetSocketAddress(hostname, port)); ++ sock = s; ++ break; ++ } catch (Exception e) { ++ log.warn("Unable to start trace server on port {}", port); ++ } ++ } ++ if (null == sock) { ++ throw new RuntimeException("Unable to start trace server on configured ports: " + Arrays.toString(ports)); ++ } + final TServerTransport transport = new TServerSocket(sock); + TThreadPoolServer.Args options = new TThreadPoolServer.Args(transport); + options.processor(new Processor<Iface>(new Receiver())); + server = new TThreadPoolServer(options); + registerInZooKeeper(sock.getInetAddress().getHostAddress() + ":" + sock.getLocalPort(), conf.get(Property.TRACE_ZK_PATH)); - writer = new AtomicReference<>(this.connector.createBatchWriter(table, new BatchWriterConfig().setMaxLatency(BATCH_WRITER_MAX_LATENCY, TimeUnit.SECONDS))); ++ writer = new AtomicReference<>(this.connector.createBatchWriter(tableName, ++ new BatchWriterConfig().setMaxLatency(BATCH_WRITER_MAX_LATENCY, TimeUnit.SECONDS))); + } + + /** + * Exceptions thrown out of here should be things that cause service failure (e.g. misconfigurations that aren't likely to change on retry). + * + * @return a working Connection that can be reused + * @throws ClassNotFoundException + * if TRACE_TOKEN_TYPE is set to a class that we can't load. + * @throws InstantiationException + * if we fail to create an instance of TRACE_TOKEN_TYPE. + * @throws IllegalAccessException + * if the class pointed to by TRACE_TOKEN_TYPE is private. + * @throws AccumuloSecurityException + * if the trace user has the wrong permissions + */ + private Connector ensureTraceTableExists(final AccumuloConfiguration conf) throws AccumuloSecurityException, ClassNotFoundException, InstantiationException, + IllegalAccessException { Connector connector = null; while (true) { try { @@@ -215,46 -246,20 +261,20 @@@ } connector = serverConfiguration.getInstance().getConnector(principal, at); - if (!connector.tableOperations().exists(table)) { - connector.tableOperations().create(table); + if (!connector.tableOperations().exists(tableName)) { + connector.tableOperations().create(tableName); IteratorSetting setting = new IteratorSetting(10, "ageoff", AgeOffFilter.class.getName()); AgeOffFilter.setTTL(setting, 7 * 24 * 60 * 60 * 1000l); - connector.tableOperations().attachIterator(table, setting); + connector.tableOperations().attachIterator(tableName, setting); } - connector.tableOperations().setProperty(table, Property.TABLE_FORMATTER_CLASS.getKey(), TraceFormatter.class.getName()); + connector.tableOperations().setProperty(tableName, Property.TABLE_FORMATTER_CLASS.getKey(), TraceFormatter.class.getName()); break; - } catch (RuntimeException ex) { + } catch (AccumuloException | TableExistsException | TableNotFoundException | IOException | RuntimeException ex) { log.info("Waiting to checking/create the trace table.", ex); - UtilWaitThread.sleep(1000); + sleepUninterruptibly(1, TimeUnit.SECONDS); } } - this.connector = connector; - // make sure we refer to the final variable from now on. - connector = null; - - int ports[] = conf.getPort(Property.TRACE_PORT); - ServerSocket sock = null; - for (int port : ports) { - ServerSocket s = ServerSocketChannel.open().socket(); - s.setReuseAddress(true); - try { - s.bind(new InetSocketAddress(hostname, port)); - sock = s; - break; - } catch (Exception e) { - log.warn("Unable to start trace server on port {}", port); - } - } - if (null == sock) { - throw new RuntimeException("Unable to start trace server on configured ports: " + Arrays.toString(ports)); - } - final TServerTransport transport = new TServerSocket(sock); - TThreadPoolServer.Args options = new TThreadPoolServer.Args(transport); - options.processor(new Processor<Iface>(new Receiver())); - server = new TThreadPoolServer(options); - registerInZooKeeper(sock.getInetAddress().getHostAddress() + ":" + sock.getLocalPort(), conf.get(Property.TRACE_ZK_PATH)); - writer = new AtomicReference<>(this.connector.createBatchWriter(tableName, - new BatchWriterConfig().setMaxLatency(BATCH_WRITER_MAX_LATENCY, TimeUnit.SECONDS))); + return connector; } public void run() throws Exception {