This is an automated email from the ASF dual-hosted git repository. ctubbsii pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push: new 62d2a6f4f5 Refactored DfsLogger constructors to static methods (#4192) 62d2a6f4f5 is described below commit 62d2a6f4f579d0c43c6b4bc54f76ff6dd734182e Author: Arbaaz Khan <bazzy...@yahoo.com> AuthorDate: Wed Feb 7 17:08:36 2024 -0500 Refactored DfsLogger constructors to static methods (#4192) * Refactored DfsLogger constructors to static methods that now call a private constructors * Open method is now private and is being called from a new Dfslogger variable located in createNew Includes code review feedback from ctubbsii: * Push params used only for creating a new file down into the open method and remove from DfsLogger fields * Remove unneeded second private constructor * Tweak method names * Remove unnecessary mock object from test --------- Co-authored-by: Christopher Tubbs <ctubb...@apache.org> --- .../org/apache/accumulo/tserver/log/DfsLogger.java | 83 +++++++++++++--------- .../accumulo/tserver/log/TabletServerLogger.java | 4 +- .../org/apache/accumulo/tserver/tablet/Tablet.java | 2 +- .../accumulo/tserver/WalRemovalOrderTest.java | 24 +------ 4 files changed, 55 insertions(+), 58 deletions(-) diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java index 91e9382b71..c0ed0aed39 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java @@ -56,6 +56,7 @@ import org.apache.accumulo.core.spi.crypto.CryptoEnvironment.Scope; import org.apache.accumulo.core.spi.crypto.CryptoService; import org.apache.accumulo.core.spi.crypto.FileDecrypter; import org.apache.accumulo.core.spi.crypto.FileEncrypter; +import org.apache.accumulo.core.spi.fs.VolumeChooserEnvironment; import org.apache.accumulo.core.tabletserver.log.LogEntry; import org.apache.accumulo.core.util.Pair; import org.apache.accumulo.core.util.threads.Threads; @@ -74,7 +75,6 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.base.Joiner; import com.google.common.base.Preconditions; /** @@ -116,12 +116,11 @@ public final class DfsLogger implements Comparable<DfsLogger> { } } - private final LinkedBlockingQueue<DfsLogger.LogWork> workQueue = new LinkedBlockingQueue<>(); + private final LinkedBlockingQueue<LogWork> workQueue = new LinkedBlockingQueue<>(); private final Object closeLock = new Object(); - private static final DfsLogger.LogWork CLOSED_MARKER = - new DfsLogger.LogWork(null, Durability.FLUSH); + private static final LogWork CLOSED_MARKER = new LogWork(null, Durability.FLUSH); private static final LogFileValue EMPTY = new LogFileValue(); @@ -130,9 +129,19 @@ public final class DfsLogger implements Comparable<DfsLogger> { private class LogSyncingTask implements Runnable { private int expectedReplication = 0; + private final AtomicLong syncCounter; + private final AtomicLong flushCounter; + private final long slowFlushMillis; + + LogSyncingTask(AtomicLong syncCounter, AtomicLong flushCounter, long slowFlushMillis) { + this.syncCounter = syncCounter; + this.flushCounter = flushCounter; + this.slowFlushMillis = slowFlushMillis; + } + @Override public void run() { - ArrayList<DfsLogger.LogWork> work = new ArrayList<>(); + ArrayList<LogWork> work = new ArrayList<>(); boolean sawClosedMarker = false; while (!sawClosedMarker) { work.clear(); @@ -205,7 +214,7 @@ public final class DfsLogger implements Comparable<DfsLogger> { } } - for (DfsLogger.LogWork logWork : work) { + for (LogWork logWork : work) { if (logWork == CLOSED_MARKER) { sawClosedMarker = true; } else { @@ -215,9 +224,9 @@ public final class DfsLogger implements Comparable<DfsLogger> { } } - private void fail(ArrayList<DfsLogger.LogWork> work, Exception ex, String why) { + private void fail(ArrayList<LogWork> work, Exception ex, String why) { log.warn("Exception {} {}", why, ex, ex); - for (DfsLogger.LogWork logWork : work) { + for (LogWork logWork : work) { logWork.exception = ex; } } @@ -288,21 +297,34 @@ public final class DfsLogger implements Comparable<DfsLogger> { return logEntry.hashCode(); } - private final ServerContext context; private FSDataOutputStream logFile; private DataOutputStream encryptingLogFile = null; - private LogEntry logEntry; + private final LogEntry logEntry; private Thread syncThread; - private AtomicLong syncCounter; - private AtomicLong flushCounter; - private final long slowFlushMillis; private long writes = 0; - public DfsLogger(ServerContext context, AtomicLong syncCounter, AtomicLong flushCounter) { - this(context, null); - this.syncCounter = syncCounter; - this.flushCounter = flushCounter; + /** + * Create a new DfsLogger with the provided characteristics. + */ + public static DfsLogger createNew(ServerContext context, AtomicLong syncCounter, + AtomicLong flushCounter, String address) throws IOException { + + String filename = UUID.randomUUID().toString(); + String addressForFilename = address.replace(':', '+'); + + var chooserEnv = + new VolumeChooserEnvironmentImpl(VolumeChooserEnvironment.Scope.LOGGER, context); + String logPath = + context.getVolumeManager().choose(chooserEnv, context.getBaseUris()) + Path.SEPARATOR + + Constants.WAL_DIR + Path.SEPARATOR + addressForFilename + Path.SEPARATOR + filename; + + LogEntry log = LogEntry.fromPath(logPath); + DfsLogger dfsLogger = new DfsLogger(log); + long slowFlushMillis = + context.getConfiguration().getTimeInMillis(Property.TSERV_SLOW_FLUSH_MILLIS); + dfsLogger.open(context, logPath, filename, address, syncCounter, flushCounter, slowFlushMillis); + return dfsLogger; } /** @@ -310,10 +332,11 @@ public final class DfsLogger implements Comparable<DfsLogger> { * * @param logEntry the "log" entry in +r/!0 */ - public DfsLogger(ServerContext context, LogEntry logEntry) { - this.context = context; - this.slowFlushMillis = - context.getConfiguration().getTimeInMillis(Property.TSERV_SLOW_FLUSH_MILLIS); + public static DfsLogger fromLogEntry(LogEntry logEntry) { + return new DfsLogger(logEntry); + } + + private DfsLogger(LogEntry logEntry) { this.logEntry = logEntry; } @@ -372,19 +395,14 @@ public final class DfsLogger implements Comparable<DfsLogger> { * * @param address The address of the host using this WAL */ - public synchronized void open(String address) throws IOException { - String filename = UUID.randomUUID().toString(); + private synchronized void open(ServerContext context, String logPath, String filename, + String address, AtomicLong syncCounter, AtomicLong flushCounter, long slowFlushMillis) + throws IOException { log.debug("Address is {}", address); - String logger = Joiner.on("+").join(address.split(":")); log.debug("DfsLogger.open() begin"); - VolumeManager fs = context.getVolumeManager(); - var chooserEnv = new VolumeChooserEnvironmentImpl( - org.apache.accumulo.core.spi.fs.VolumeChooserEnvironment.Scope.LOGGER, context); - String logPath = fs.choose(chooserEnv, context.getBaseUris()) + Path.SEPARATOR - + Constants.WAL_DIR + Path.SEPARATOR + logger + Path.SEPARATOR + filename; - this.logEntry = LogEntry.fromPath(logPath); + VolumeManager fs = context.getVolumeManager(); LoggerOperation op; var serverConf = context.getConfiguration(); @@ -453,7 +471,8 @@ public final class DfsLogger implements Comparable<DfsLogger> { throw new IOException(ex); } - syncThread = Threads.createThread("Accumulo WALog thread " + this, new LogSyncingTask()); + syncThread = Threads.createThread("Accumulo WALog thread " + this, + new LogSyncingTask(syncCounter, flushCounter, slowFlushMillis)); syncThread.start(); op.await(); log.debug("Got new write-ahead log: {}", this); @@ -548,7 +567,7 @@ public final class DfsLogger implements Comparable<DfsLogger> { private LoggerOperation logFileData(List<Pair<LogFileKey,LogFileValue>> keys, Durability durability) throws IOException { - DfsLogger.LogWork work = new DfsLogger.LogWork(new CountDownLatch(1), durability); + LogWork work = new LogWork(new CountDownLatch(1), durability); try { for (Pair<LogFileKey,LogFileValue> pair : keys) { write(pair.getFirst(), pair.getSecond()); diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java index 3103d01044..dc66adc26a 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java @@ -266,8 +266,8 @@ public class TabletServerLogger { DfsLogger alog = null; try { - alog = new DfsLogger(tserver.getContext(), syncCounter, flushCounter); - alog.open(tserver.getClientAddressString()); + alog = DfsLogger.createNew(tserver.getContext(), syncCounter, flushCounter, + tserver.getClientAddressString()); } catch (Exception t) { log.error("Failed to open WAL", t); // the log is not advertised in ZK yet, so we can just delete it if it exists diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java index f626e78720..90af6d4326 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java @@ -369,7 +369,7 @@ public class Tablet extends TabletBase { // make some closed references that represent the recovered logs currentLogs = new HashSet<>(); for (LogEntry logEntry : logEntries) { - currentLogs.add(new DfsLogger(tabletServer.getContext(), logEntry)); + currentLogs.add(DfsLogger.fromLogEntry(logEntry)); } rebuildReferencedLogs(); diff --git a/server/tserver/src/test/java/org/apache/accumulo/tserver/WalRemovalOrderTest.java b/server/tserver/src/test/java/org/apache/accumulo/tserver/WalRemovalOrderTest.java index 69d7253a72..2e682b4231 100644 --- a/server/tserver/src/test/java/org/apache/accumulo/tserver/WalRemovalOrderTest.java +++ b/server/tserver/src/test/java/org/apache/accumulo/tserver/WalRemovalOrderTest.java @@ -18,10 +18,6 @@ */ package org.apache.accumulo.tserver; -import static org.easymock.EasyMock.createMock; -import static org.easymock.EasyMock.expect; -import static org.easymock.EasyMock.replay; -import static org.easymock.EasyMock.verify; import static org.junit.jupiter.api.Assertions.assertEquals; import java.util.Collections; @@ -29,35 +25,17 @@ import java.util.LinkedHashSet; import java.util.List; import java.util.Set; -import org.apache.accumulo.core.conf.DefaultConfiguration; import org.apache.accumulo.core.tabletserver.log.LogEntry; -import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.tserver.log.DfsLogger; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import com.google.common.collect.Sets; public class WalRemovalOrderTest { - private ServerContext context; - - @BeforeEach - private void createMocks() { - context = createMock(ServerContext.class); - expect(context.getConfiguration()).andReturn(DefaultConfiguration.getInstance()).anyTimes(); - replay(context); - } - - @AfterEach - private void verifyMocks() { - verify(context); - } - private DfsLogger mockLogger(String filename) { var mockLogEntry = LogEntry.fromPath(filename + "+1234/11111111-1111-1111-1111-111111111111"); - return new DfsLogger(context, mockLogEntry); + return DfsLogger.fromLogEntry(mockLogEntry); } private LinkedHashSet<DfsLogger> mockLoggers(String... logs) {