Repository: accumulo
Updated Branches:
  refs/heads/master 861db793d -> cbdf02e33


ACCUMULO-4409 Create AccumuloMonitorAppender


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/9bc9ec32
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/9bc9ec32
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/9bc9ec32

Branch: refs/heads/master
Commit: 9bc9ec32c74ba0a313ac8fa0c3f7df5f6a3c6deb
Parents: a1cf8e2
Author: Christopher Tubbs <ctubb...@apache.org>
Authored: Thu Feb 23 22:55:24 2017 -0500
Committer: Christopher Tubbs <ctubb...@apache.org>
Committed: Mon Mar 6 16:41:43 2017 -0500

----------------------------------------------------------------------
 .../monitor/util/AccumuloMonitorAppender.java   | 125 +++++++++++++++++++
 1 file changed, 125 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/9bc9ec32/server/monitor/src/main/java/org/apache/accumulo/monitor/util/AccumuloMonitorAppender.java
----------------------------------------------------------------------
diff --git 
a/server/monitor/src/main/java/org/apache/accumulo/monitor/util/AccumuloMonitorAppender.java
 
b/server/monitor/src/main/java/org/apache/accumulo/monitor/util/AccumuloMonitorAppender.java
new file mode 100644
index 0000000..5802920
--- /dev/null
+++ 
b/server/monitor/src/main/java/org/apache/accumulo/monitor/util/AccumuloMonitorAppender.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.monitor.util;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import java.util.Arrays;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.zookeeper.ZooUtil;
+import org.apache.accumulo.fate.zookeeper.ZooCache;
+import org.apache.accumulo.fate.zookeeper.ZooCacheFactory;
+import org.apache.accumulo.server.client.HdfsZooInstance;
+import org.apache.log4j.AsyncAppender;
+import org.apache.log4j.net.SocketAppender;
+
+import com.google.common.net.HostAndPort;
+
+public class AccumuloMonitorAppender extends AsyncAppender {
+
+  private final ScheduledExecutorService executorService;
+  private final AtomicBoolean trackerScheduled;
+
+  /**
+   * A Log4j Appender which follows the registered location of the active 
Accumulo monitor service, and forwards log messages to it
+   */
+  public AccumuloMonitorAppender() {
+    // create the background thread to watch for updates to monitor location
+    trackerScheduled = new AtomicBoolean(false);
+    executorService = Executors.newSingleThreadScheduledExecutor(runnable -> {
+      Thread t = new Thread(runnable, "MonitorLog4jLocationTracker");
+      t.setDaemon(true);
+      return t;
+    });
+  }
+
+  @Override
+  public void activateOptions() {
+    if (trackerScheduled.compareAndSet(false, true)) {
+      executorService.scheduleAtFixedRate(new 
MonitorTracker(HdfsZooInstance.getInstance()), 5, 10, TimeUnit.SECONDS);
+    }
+    super.activateOptions();
+  }
+
+  @Override
+  public void close() {
+    if (!executorService.isShutdown()) {
+      executorService.shutdownNow();
+    }
+    super.close();
+  }
+
+  private class MonitorTracker implements Runnable {
+
+    private final String path;
+    private final ZooCache zooCache;
+
+    private byte[] lastLocation;
+    private SocketAppender lastSocketAppender;
+
+    public MonitorTracker(Instance instance) {
+      this.path = ZooUtil.getRoot(instance) + Constants.ZMONITOR_LOG4J_ADDR;
+      this.zooCache = new 
ZooCacheFactory().getZooCache(instance.getZooKeepers(), 
instance.getZooKeepersSessionTimeOut());
+
+      this.lastLocation = null;
+      this.lastSocketAppender = null;
+    }
+
+    @Override
+    public void run() {
+      byte[] loc = zooCache.get(path);
+      if (!Arrays.equals(loc, lastLocation)) {
+        // something changed
+        switchAppender(loc);
+      }
+    }
+
+    private void switchAppender(byte[] loc) {
+      // remove and close the last one, if it was non-null
+      if (lastSocketAppender != null) {
+        AccumuloMonitorAppender.this.removeAppender(lastSocketAppender);
+        lastSocketAppender.close();
+      }
+
+      // create a new one, if it is non-null
+      if (loc != null) {
+
+        int defaultPort = 
Integer.parseUnsignedInt(Property.MONITOR_LOG4J_PORT.getDefaultValue());
+        HostAndPort remote = HostAndPort.fromString(new String(loc, UTF_8));
+
+        SocketAppender socketAppender = new SocketAppender();
+        socketAppender.setRemoteHost(remote.getHostText());
+        socketAppender.setPort(remote.getPortOrDefault(defaultPort));
+
+        lastLocation = loc;
+        lastSocketAppender = socketAppender;
+
+        socketAppender.activateOptions();
+        AccumuloMonitorAppender.this.addAppender(socketAppender);
+      }
+    }
+
+  }
+
+}

Reply via email to