This is an automated email from the ASF dual-hosted git repository.

ctubbsii pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/main by this push:
     new 62d2a6f4f5 Refactored DfsLogger constructors to static methods (#4192)
62d2a6f4f5 is described below

commit 62d2a6f4f579d0c43c6b4bc54f76ff6dd734182e
Author: Arbaaz Khan <bazzy...@yahoo.com>
AuthorDate: Wed Feb 7 17:08:36 2024 -0500

    Refactored DfsLogger constructors to static methods (#4192)
    
    * Refactored DfsLogger constructors to static methods that now call a 
private constructors
    * Open method is now private and is being called from a new Dfslogger 
variable located in createNew
    
    Includes code review feedback from ctubbsii:
    
    * Push params used only for creating a new file down into the open
      method and remove from DfsLogger fields
    * Remove unneeded second private constructor
    * Tweak method names
    * Remove unnecessary mock object from test
    
    ---------
    
    Co-authored-by: Christopher Tubbs <ctubb...@apache.org>
---
 .../org/apache/accumulo/tserver/log/DfsLogger.java | 83 +++++++++++++---------
 .../accumulo/tserver/log/TabletServerLogger.java   |  4 +-
 .../org/apache/accumulo/tserver/tablet/Tablet.java |  2 +-
 .../accumulo/tserver/WalRemovalOrderTest.java      | 24 +------
 4 files changed, 55 insertions(+), 58 deletions(-)

diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
index 91e9382b71..c0ed0aed39 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
@@ -56,6 +56,7 @@ import 
org.apache.accumulo.core.spi.crypto.CryptoEnvironment.Scope;
 import org.apache.accumulo.core.spi.crypto.CryptoService;
 import org.apache.accumulo.core.spi.crypto.FileDecrypter;
 import org.apache.accumulo.core.spi.crypto.FileEncrypter;
+import org.apache.accumulo.core.spi.fs.VolumeChooserEnvironment;
 import org.apache.accumulo.core.tabletserver.log.LogEntry;
 import org.apache.accumulo.core.util.Pair;
 import org.apache.accumulo.core.util.threads.Threads;
@@ -74,7 +75,6 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
 
 /**
@@ -116,12 +116,11 @@ public final class DfsLogger implements 
Comparable<DfsLogger> {
     }
   }
 
-  private final LinkedBlockingQueue<DfsLogger.LogWork> workQueue = new 
LinkedBlockingQueue<>();
+  private final LinkedBlockingQueue<LogWork> workQueue = new 
LinkedBlockingQueue<>();
 
   private final Object closeLock = new Object();
 
-  private static final DfsLogger.LogWork CLOSED_MARKER =
-      new DfsLogger.LogWork(null, Durability.FLUSH);
+  private static final LogWork CLOSED_MARKER = new LogWork(null, 
Durability.FLUSH);
 
   private static final LogFileValue EMPTY = new LogFileValue();
 
@@ -130,9 +129,19 @@ public final class DfsLogger implements 
Comparable<DfsLogger> {
   private class LogSyncingTask implements Runnable {
     private int expectedReplication = 0;
 
+    private final AtomicLong syncCounter;
+    private final AtomicLong flushCounter;
+    private final long slowFlushMillis;
+
+    LogSyncingTask(AtomicLong syncCounter, AtomicLong flushCounter, long 
slowFlushMillis) {
+      this.syncCounter = syncCounter;
+      this.flushCounter = flushCounter;
+      this.slowFlushMillis = slowFlushMillis;
+    }
+
     @Override
     public void run() {
-      ArrayList<DfsLogger.LogWork> work = new ArrayList<>();
+      ArrayList<LogWork> work = new ArrayList<>();
       boolean sawClosedMarker = false;
       while (!sawClosedMarker) {
         work.clear();
@@ -205,7 +214,7 @@ public final class DfsLogger implements 
Comparable<DfsLogger> {
           }
         }
 
-        for (DfsLogger.LogWork logWork : work) {
+        for (LogWork logWork : work) {
           if (logWork == CLOSED_MARKER) {
             sawClosedMarker = true;
           } else {
@@ -215,9 +224,9 @@ public final class DfsLogger implements 
Comparable<DfsLogger> {
       }
     }
 
-    private void fail(ArrayList<DfsLogger.LogWork> work, Exception ex, String 
why) {
+    private void fail(ArrayList<LogWork> work, Exception ex, String why) {
       log.warn("Exception {} {}", why, ex, ex);
-      for (DfsLogger.LogWork logWork : work) {
+      for (LogWork logWork : work) {
         logWork.exception = ex;
       }
     }
@@ -288,21 +297,34 @@ public final class DfsLogger implements 
Comparable<DfsLogger> {
     return logEntry.hashCode();
   }
 
-  private final ServerContext context;
   private FSDataOutputStream logFile;
   private DataOutputStream encryptingLogFile = null;
-  private LogEntry logEntry;
+  private final LogEntry logEntry;
   private Thread syncThread;
 
-  private AtomicLong syncCounter;
-  private AtomicLong flushCounter;
-  private final long slowFlushMillis;
   private long writes = 0;
 
-  public DfsLogger(ServerContext context, AtomicLong syncCounter, AtomicLong 
flushCounter) {
-    this(context, null);
-    this.syncCounter = syncCounter;
-    this.flushCounter = flushCounter;
+  /**
+   * Create a new DfsLogger with the provided characteristics.
+   */
+  public static DfsLogger createNew(ServerContext context, AtomicLong 
syncCounter,
+      AtomicLong flushCounter, String address) throws IOException {
+
+    String filename = UUID.randomUUID().toString();
+    String addressForFilename = address.replace(':', '+');
+
+    var chooserEnv =
+        new 
VolumeChooserEnvironmentImpl(VolumeChooserEnvironment.Scope.LOGGER, context);
+    String logPath =
+        context.getVolumeManager().choose(chooserEnv, context.getBaseUris()) + 
Path.SEPARATOR
+            + Constants.WAL_DIR + Path.SEPARATOR + addressForFilename + 
Path.SEPARATOR + filename;
+
+    LogEntry log = LogEntry.fromPath(logPath);
+    DfsLogger dfsLogger = new DfsLogger(log);
+    long slowFlushMillis =
+        
context.getConfiguration().getTimeInMillis(Property.TSERV_SLOW_FLUSH_MILLIS);
+    dfsLogger.open(context, logPath, filename, address, syncCounter, 
flushCounter, slowFlushMillis);
+    return dfsLogger;
   }
 
   /**
@@ -310,10 +332,11 @@ public final class DfsLogger implements 
Comparable<DfsLogger> {
    *
    * @param logEntry the "log" entry in +r/!0
    */
-  public DfsLogger(ServerContext context, LogEntry logEntry) {
-    this.context = context;
-    this.slowFlushMillis =
-        
context.getConfiguration().getTimeInMillis(Property.TSERV_SLOW_FLUSH_MILLIS);
+  public static DfsLogger fromLogEntry(LogEntry logEntry) {
+    return new DfsLogger(logEntry);
+  }
+
+  private DfsLogger(LogEntry logEntry) {
     this.logEntry = logEntry;
   }
 
@@ -372,19 +395,14 @@ public final class DfsLogger implements 
Comparable<DfsLogger> {
    *
    * @param address The address of the host using this WAL
    */
-  public synchronized void open(String address) throws IOException {
-    String filename = UUID.randomUUID().toString();
+  private synchronized void open(ServerContext context, String logPath, String 
filename,
+      String address, AtomicLong syncCounter, AtomicLong flushCounter, long 
slowFlushMillis)
+      throws IOException {
     log.debug("Address is {}", address);
-    String logger = Joiner.on("+").join(address.split(":"));
 
     log.debug("DfsLogger.open() begin");
-    VolumeManager fs = context.getVolumeManager();
 
-    var chooserEnv = new VolumeChooserEnvironmentImpl(
-        org.apache.accumulo.core.spi.fs.VolumeChooserEnvironment.Scope.LOGGER, 
context);
-    String logPath = fs.choose(chooserEnv, context.getBaseUris()) + 
Path.SEPARATOR
-        + Constants.WAL_DIR + Path.SEPARATOR + logger + Path.SEPARATOR + 
filename;
-    this.logEntry = LogEntry.fromPath(logPath);
+    VolumeManager fs = context.getVolumeManager();
 
     LoggerOperation op;
     var serverConf = context.getConfiguration();
@@ -453,7 +471,8 @@ public final class DfsLogger implements 
Comparable<DfsLogger> {
       throw new IOException(ex);
     }
 
-    syncThread = Threads.createThread("Accumulo WALog thread " + this, new 
LogSyncingTask());
+    syncThread = Threads.createThread("Accumulo WALog thread " + this,
+        new LogSyncingTask(syncCounter, flushCounter, slowFlushMillis));
     syncThread.start();
     op.await();
     log.debug("Got new write-ahead log: {}", this);
@@ -548,7 +567,7 @@ public final class DfsLogger implements 
Comparable<DfsLogger> {
 
   private LoggerOperation logFileData(List<Pair<LogFileKey,LogFileValue>> keys,
       Durability durability) throws IOException {
-    DfsLogger.LogWork work = new DfsLogger.LogWork(new CountDownLatch(1), 
durability);
+    LogWork work = new LogWork(new CountDownLatch(1), durability);
     try {
       for (Pair<LogFileKey,LogFileValue> pair : keys) {
         write(pair.getFirst(), pair.getSecond());
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
index 3103d01044..dc66adc26a 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
@@ -266,8 +266,8 @@ public class TabletServerLogger {
         DfsLogger alog = null;
 
         try {
-          alog = new DfsLogger(tserver.getContext(), syncCounter, 
flushCounter);
-          alog.open(tserver.getClientAddressString());
+          alog = DfsLogger.createNew(tserver.getContext(), syncCounter, 
flushCounter,
+              tserver.getClientAddressString());
         } catch (Exception t) {
           log.error("Failed to open WAL", t);
           // the log is not advertised in ZK yet, so we can just delete it if 
it exists
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
index f626e78720..90af6d4326 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
@@ -369,7 +369,7 @@ public class Tablet extends TabletBase {
       // make some closed references that represent the recovered logs
       currentLogs = new HashSet<>();
       for (LogEntry logEntry : logEntries) {
-        currentLogs.add(new DfsLogger(tabletServer.getContext(), logEntry));
+        currentLogs.add(DfsLogger.fromLogEntry(logEntry));
       }
 
       rebuildReferencedLogs();
diff --git 
a/server/tserver/src/test/java/org/apache/accumulo/tserver/WalRemovalOrderTest.java
 
b/server/tserver/src/test/java/org/apache/accumulo/tserver/WalRemovalOrderTest.java
index 69d7253a72..2e682b4231 100644
--- 
a/server/tserver/src/test/java/org/apache/accumulo/tserver/WalRemovalOrderTest.java
+++ 
b/server/tserver/src/test/java/org/apache/accumulo/tserver/WalRemovalOrderTest.java
@@ -18,10 +18,6 @@
  */
 package org.apache.accumulo.tserver;
 
-import static org.easymock.EasyMock.createMock;
-import static org.easymock.EasyMock.expect;
-import static org.easymock.EasyMock.replay;
-import static org.easymock.EasyMock.verify;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
 import java.util.Collections;
@@ -29,35 +25,17 @@ import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Set;
 
-import org.apache.accumulo.core.conf.DefaultConfiguration;
 import org.apache.accumulo.core.tabletserver.log.LogEntry;
-import org.apache.accumulo.server.ServerContext;
 import org.apache.accumulo.tserver.log.DfsLogger;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
 import com.google.common.collect.Sets;
 
 public class WalRemovalOrderTest {
 
-  private ServerContext context;
-
-  @BeforeEach
-  private void createMocks() {
-    context = createMock(ServerContext.class);
-    
expect(context.getConfiguration()).andReturn(DefaultConfiguration.getInstance()).anyTimes();
-    replay(context);
-  }
-
-  @AfterEach
-  private void verifyMocks() {
-    verify(context);
-  }
-
   private DfsLogger mockLogger(String filename) {
     var mockLogEntry = LogEntry.fromPath(filename + 
"+1234/11111111-1111-1111-1111-111111111111");
-    return new DfsLogger(context, mockLogEntry);
+    return DfsLogger.fromLogEntry(mockLogEntry);
   }
 
   private LinkedHashSet<DfsLogger> mockLoggers(String... logs) {

Reply via email to