Merge branch '1.5.1-SNAPSHOT' into 1.6.0-SNAPSHOT Conflicts: server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/7c309097 Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/7c309097 Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/7c309097 Branch: refs/heads/1.6.0-SNAPSHOT Commit: 7c309097c40779e96d51dd6c70333b5fb1cb8dd7 Parents: deae04f 3b41d37 Author: Eric Newton <eric.new...@gmail.com> Authored: Fri Jan 10 16:37:19 2014 -0500 Committer: Eric Newton <eric.new...@gmail.com> Committed: Fri Jan 10 16:37:19 2014 -0500 ---------------------------------------------------------------------- .../apache/accumulo/tserver/log/DfsLogger.java | 56 +++++++++----------- .../tserver/log/TabletServerLogger.java | 4 +- 2 files changed, 28 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/7c309097/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java ---------------------------------------------------------------------- diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java index 571d1bc,0000000..cc28ac2 mode 100644,000000..100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java @@@ -1,548 -1,0 +1,544 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.tserver.log; + +import static org.apache.accumulo.tserver.logger.LogEvents.COMPACTION_FINISH; +import static org.apache.accumulo.tserver.logger.LogEvents.COMPACTION_START; +import static org.apache.accumulo.tserver.logger.LogEvents.DEFINE_TABLET; +import static org.apache.accumulo.tserver.logger.LogEvents.MANY_MUTATIONS; +import static org.apache.accumulo.tserver.logger.LogEvents.OPEN; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.lang.reflect.Method; +import java.nio.channels.ClosedChannelException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.LinkedBlockingQueue; + +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.KeyExtent; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.security.crypto.CryptoModule; +import org.apache.accumulo.core.security.crypto.CryptoModuleFactory; +import org.apache.accumulo.core.security.crypto.CryptoModuleParameters; +import org.apache.accumulo.core.security.crypto.DefaultCryptoModule; +import org.apache.accumulo.core.security.crypto.NoFlushOutputStream; +import org.apache.accumulo.core.util.Daemon; ++import org.apache.accumulo.core.util.Pair; +import org.apache.accumulo.core.util.StringUtil; +import org.apache.accumulo.server.ServerConstants; +import org.apache.accumulo.server.fs.VolumeManager; +import org.apache.accumulo.server.master.state.TServerInstance; +import org.apache.accumulo.tserver.TabletMutations; +import org.apache.accumulo.tserver.logger.LogFileKey; +import org.apache.accumulo.tserver.logger.LogFileValue; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.Path; +import org.apache.log4j.Logger; + +/** + * Wrap a connection to a logger. + * + */ +public class DfsLogger { + // Package private so that LogSorter can find this + static final String LOG_FILE_HEADER_V2 = "--- Log File Header (v2) ---"; + static final String LOG_FILE_HEADER_V3 = "--- Log File Header (v3) ---"; + + private static Logger log = Logger.getLogger(DfsLogger.class); + + public static class LogClosedException extends IOException { + private static final long serialVersionUID = 1L; + + public LogClosedException() { + super("LogClosed"); + } + } + + public static class DFSLoggerInputStreams { + + private FSDataInputStream originalInput; + private DataInputStream decryptingInputStream; + + public DFSLoggerInputStreams(FSDataInputStream originalInput, DataInputStream decryptingInputStream) { + this.originalInput = originalInput; + this.decryptingInputStream = decryptingInputStream; + } + + public FSDataInputStream getOriginalInput() { + return originalInput; + } + + public void setOriginalInput(FSDataInputStream originalInput) { + this.originalInput = originalInput; + } + + public DataInputStream getDecryptingInputStream() { + return decryptingInputStream; + } + + public void setDecryptingInputStream(DataInputStream decryptingInputStream) { + this.decryptingInputStream = decryptingInputStream; + } + } + + + public interface ServerResources { + AccumuloConfiguration getConfiguration(); + + VolumeManager getFileSystem(); + + Set<TServerInstance> getCurrentTServers(); + } + + private final LinkedBlockingQueue<DfsLogger.LogWork> workQueue = new LinkedBlockingQueue<DfsLogger.LogWork>(); + + private final Object closeLock = new Object(); + - private static final DfsLogger.LogWork CLOSED_MARKER = new DfsLogger.LogWork(null, null); ++ private static final DfsLogger.LogWork CLOSED_MARKER = new DfsLogger.LogWork(null); + + private static final LogFileValue EMPTY = new LogFileValue(); + + private boolean closed = false; + + private class LogSyncingTask implements Runnable { + + @Override + public void run() { + ArrayList<DfsLogger.LogWork> work = new ArrayList<DfsLogger.LogWork>(); + while (true) { + work.clear(); + + try { + work.add(workQueue.take()); + } catch (InterruptedException ex) { + continue; + } + workQueue.drainTo(work); + + synchronized (closeLock) { + if (!closed) { + try { + sync.invoke(logFile); + } catch (Exception ex) { + log.warn("Exception syncing " + ex); + for (DfsLogger.LogWork logWork : work) { + logWork.exception = ex; + } + } + } else { + for (DfsLogger.LogWork logWork : work) { + logWork.exception = new LogClosedException(); + } + } + } + + boolean sawClosedMarker = false; + for (DfsLogger.LogWork logWork : work) + if (logWork == CLOSED_MARKER) + sawClosedMarker = true; + else + logWork.latch.countDown(); + + if (sawClosedMarker) { + synchronized (closeLock) { + closeLock.notifyAll(); + } + break; + } + } + } + } + + static class LogWork { - List<TabletMutations> mutations; + CountDownLatch latch; + volatile Exception exception; - - public LogWork(List<TabletMutations> mutations, CountDownLatch latch) { - this.mutations = mutations; ++ ++ public LogWork(CountDownLatch latch) { + this.latch = latch; + } + } + + public static class LoggerOperation { + private final LogWork work; + + public LoggerOperation(LogWork work) { + this.work = work; + } + + public void await() throws IOException { + try { + work.latch.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + + if (work.exception != null) { + if (work.exception instanceof IOException) + throw (IOException) work.exception; + else if (work.exception instanceof RuntimeException) + throw (RuntimeException) work.exception; + else + throw new RuntimeException(work.exception); + } + } + } + + @Override + public boolean equals(Object obj) { + // filename is unique + if (obj == null) + return false; + if (obj instanceof DfsLogger) + return getFileName().equals(((DfsLogger) obj).getFileName()); + return false; + } + + @Override + public int hashCode() { + // filename is unique + return getFileName().hashCode(); + } + + private final ServerResources conf; + private FSDataOutputStream logFile; + private DataOutputStream encryptingLogFile = null; + private Method sync; + private String logPath; + + public DfsLogger(ServerResources conf) throws IOException { + this.conf = conf; + } + + public DfsLogger(ServerResources conf, String filename) throws IOException { + this.conf = conf; + this.logPath = filename; + } + + public static DFSLoggerInputStreams readHeaderAndReturnStream(VolumeManager fs, Path path, AccumuloConfiguration conf) throws IOException { + FSDataInputStream input = fs.open(path); + DataInputStream decryptingInput = null; + + byte[] magic = DfsLogger.LOG_FILE_HEADER_V3.getBytes(); + byte[] magicBuffer = new byte[magic.length]; + input.readFully(magicBuffer); + if (Arrays.equals(magicBuffer, magic)) { + // additional parameters it needs from the underlying stream. + String cryptoModuleClassname = input.readUTF(); + CryptoModule cryptoModule = CryptoModuleFactory.getCryptoModule(cryptoModuleClassname); + + // Create the parameters and set the input stream into those parameters + CryptoModuleParameters params = CryptoModuleFactory.createParamsObjectFromAccumuloConfiguration(conf); + params.setEncryptedInputStream(input); + + // Create the plaintext input stream from the encrypted one + params = cryptoModule.getDecryptingInputStream(params); + + if (params.getPlaintextInputStream() instanceof DataInputStream) { + decryptingInput = (DataInputStream) params.getPlaintextInputStream(); + } else { + decryptingInput = new DataInputStream(params.getPlaintextInputStream()); + } + } else { + input.seek(0); + byte[] magicV2 = DfsLogger.LOG_FILE_HEADER_V2.getBytes(); + byte[] magicBufferV2 = new byte[magic.length]; + input.readFully(magicBufferV2); + + if (Arrays.equals(magicBufferV2, magicV2)) { + // Log files from 1.5 dump their options in raw to the logger files. Since we don't know the class + // that needs to read those files, we can make a couple of basic assumptions. Either it's going to be + // the NullCryptoModule (no crypto) or the DefaultCryptoModule. + + // If it's null, we won't have any parameters whatsoever. First, let's attempt to read + // parameters + Map<String,String> opts = new HashMap<String,String>(); + int count = input.readInt(); + for (int i = 0; i < count; i++) { + String key = input.readUTF(); + String value = input.readUTF(); + opts.put(key, value); + } + + if (opts.size() == 0) { + // NullCryptoModule, we're done + decryptingInput = input; + } else { + + // The DefaultCryptoModule will want to read the parameters from the underlying file, so we will put the file back to that spot. + org.apache.accumulo.core.security.crypto.CryptoModule cryptoModule = org.apache.accumulo.core.security.crypto.CryptoModuleFactory + .getCryptoModule(DefaultCryptoModule.class.getName()); + + CryptoModuleParameters params = CryptoModuleFactory.createParamsObjectFromAccumuloConfiguration(conf); + + input.seek(0); + input.readFully(magicBuffer); + params.setEncryptedInputStream(input); + + params = cryptoModule.getDecryptingInputStream(params); + if (params.getPlaintextInputStream() instanceof DataInputStream) { + decryptingInput = (DataInputStream) params.getPlaintextInputStream(); + } else { + decryptingInput = new DataInputStream(params.getPlaintextInputStream()); + } + } + + } else { + + input.seek(0); + decryptingInput = input; + } + + } + return new DFSLoggerInputStreams(input, decryptingInput); + } + + public synchronized void open(String address) throws IOException { + String filename = UUID.randomUUID().toString(); + String logger = StringUtil.join(Arrays.asList(address.split(":")), "+"); + + log.debug("DfsLogger.open() begin"); + VolumeManager fs = conf.getFileSystem(); + + logPath = fs.choose(ServerConstants.getWalDirs()) + "/" + logger + "/" + filename; + try { + short replication = (short) conf.getConfiguration().getCount(Property.TSERV_WAL_REPLICATION); + if (replication == 0) + replication = fs.getDefaultReplication(new Path(logPath)); + long blockSize = conf.getConfiguration().getMemoryInBytes(Property.TSERV_WAL_BLOCKSIZE); + if (blockSize == 0) + blockSize = (long) (conf.getConfiguration().getMemoryInBytes(Property.TSERV_WALOG_MAX_SIZE) * 1.1); + if (conf.getConfiguration().getBoolean(Property.TSERV_WAL_SYNC)) + logFile = fs.createSyncable(new Path(logPath), 0, replication, blockSize); + else + logFile = fs.create(new Path(logPath), true, 0, replication, blockSize); + + try { + NoSuchMethodException e = null; + try { + // sync: send data to datanodes + sync = logFile.getClass().getMethod("sync"); + } catch (NoSuchMethodException ex) { + e = ex; + } + try { + // hsync: send data to datanodes and sync the data to disk + sync = logFile.getClass().getMethod("hsync"); + e = null; + } catch (NoSuchMethodException ex) {} + if (e != null) + throw new RuntimeException(e); + } catch (Exception e) { + throw new RuntimeException(e); + } + + // Initialize the crypto operations. + org.apache.accumulo.core.security.crypto.CryptoModule cryptoModule = org.apache.accumulo.core.security.crypto.CryptoModuleFactory.getCryptoModule(conf + .getConfiguration().get(Property.CRYPTO_MODULE_CLASS)); + + // Initialize the log file with a header and the crypto params used to set up this log file. + logFile.write(LOG_FILE_HEADER_V3.getBytes()); + + CryptoModuleParameters params = CryptoModuleFactory.createParamsObjectFromAccumuloConfiguration(conf.getConfiguration()); + + params.setPlaintextOutputStream(new NoFlushOutputStream(logFile)); + + // In order to bootstrap the reading of this file later, we have to record the CryptoModule that was used to encipher it here, + // so that that crypto module can re-read its own parameters. + + logFile.writeUTF(conf.getConfiguration().get(Property.CRYPTO_MODULE_CLASS)); + + + params = cryptoModule.getEncryptingOutputStream(params); + OutputStream encipheringOutputStream = params.getEncryptedOutputStream(); + + // If the module just kicks back our original stream, then just use it, don't wrap it in + // another data OutputStream. + if (encipheringOutputStream == logFile) { + encryptingLogFile = logFile; + } else { + encryptingLogFile = new DataOutputStream(encipheringOutputStream); + } + + LogFileKey key = new LogFileKey(); + key.event = OPEN; + key.tserverSession = filename; + key.filename = filename; + write(key, EMPTY); + sync.invoke(logFile); + log.debug("Got new write-ahead log: " + this); + } catch (Exception ex) { + if (logFile != null) + logFile.close(); + logFile = null; + encryptingLogFile = null; + throw new IOException(ex); + } + + Thread t = new Daemon(new LogSyncingTask()); + t.setName("Accumulo WALog thread " + toString()); + t.start(); + } + + @Override + public String toString() { + String fileName = getFileName(); + if (fileName.contains(":")) + return getLogger() + "/" + getFileName(); + return fileName; + } + + public String getFileName() { + return logPath.toString(); + } + + public void close() throws IOException { + + synchronized (closeLock) { + if (closed) + return; + // after closed is set to true, nothing else should be added to the queue + // CLOSED_MARKER should be the last thing on the queue, therefore when the + // background thread sees the marker and exits there should be nothing else + // to process... so nothing should be left waiting for the background + // thread to do work + closed = true; + workQueue.add(CLOSED_MARKER); + while (!workQueue.isEmpty()) + try { + closeLock.wait(); + } catch (InterruptedException e) { + log.info("Interrupted"); + } + } + + if (encryptingLogFile != null) + try { + encryptingLogFile.close(); + } catch (IOException ex) { + log.error(ex); + throw new LogClosedException(); + } + } + + public synchronized void defineTablet(int seq, int tid, KeyExtent tablet) throws IOException { + // write this log to the METADATA table + final LogFileKey key = new LogFileKey(); + key.event = DEFINE_TABLET; + key.seq = seq; + key.tid = tid; + key.tablet = tablet; + try { + write(key, EMPTY); + sync.invoke(logFile); + } catch (Exception ex) { + log.error(ex); + throw new IOException(ex); + } + } + + /** + * @param key + * @param empty2 + * @throws IOException + */ + private synchronized void write(LogFileKey key, LogFileValue value) throws IOException { + key.write(encryptingLogFile); + value.write(encryptingLogFile); + encryptingLogFile.flush(); + } + + public LoggerOperation log(int seq, int tid, Mutation mutation) throws IOException { + return logManyTablets(Collections.singletonList(new TabletMutations(tid, seq, Collections.singletonList(mutation)))); + } + - public LoggerOperation logManyTablets(List<TabletMutations> mutations) throws IOException { - DfsLogger.LogWork work = new DfsLogger.LogWork(mutations, new CountDownLatch(1)); - ++ private LoggerOperation logFileData(List<Pair<LogFileKey, LogFileValue>> keys) throws IOException { ++ DfsLogger.LogWork work = new DfsLogger.LogWork(new CountDownLatch(1)); + synchronized (DfsLogger.this) { + try { - for (TabletMutations tabletMutations : mutations) { - LogFileKey key = new LogFileKey(); - key.event = MANY_MUTATIONS; - key.seq = tabletMutations.getSeq(); - key.tid = tabletMutations.getTid(); - LogFileValue value = new LogFileValue(); - value.mutations = tabletMutations.getMutations(); - write(key, value); ++ for (Pair<LogFileKey,LogFileValue> pair : keys) { ++ write(pair.getFirst(), pair.getSecond()); + } + } catch (ClosedChannelException ex) { + throw new LogClosedException(); + } catch (Exception e) { + log.error(e, e); + work.exception = e; + } + } + + synchronized (closeLock) { + // use a different lock for close check so that adding to work queue does not need + // to wait on walog I/O operations + + if (closed) + throw new LogClosedException(); + workQueue.add(work); + } + + return new LoggerOperation(work); + } + - public synchronized void minorCompactionFinished(int seq, int tid, String fqfn) throws IOException { ++ public LoggerOperation logManyTablets(List<TabletMutations> mutations) throws IOException { ++ List<Pair<LogFileKey, LogFileValue>> data = new ArrayList<Pair<LogFileKey, LogFileValue>>(); ++ for (TabletMutations tabletMutations : mutations) { ++ LogFileKey key = new LogFileKey(); ++ key.event = MANY_MUTATIONS; ++ key.seq = tabletMutations.getSeq(); ++ key.tid = tabletMutations.getTid(); ++ LogFileValue value = new LogFileValue(); ++ value.mutations = tabletMutations.getMutations(); ++ data.add(new Pair<LogFileKey, LogFileValue>(key, value)); ++ } ++ return logFileData(data); ++ } ++ ++ public LoggerOperation minorCompactionFinished(int seq, int tid, String fqfn) throws IOException { + LogFileKey key = new LogFileKey(); + key.event = COMPACTION_FINISH; + key.seq = seq; + key.tid = tid; - try { - write(key, EMPTY); - } catch (IOException ex) { - log.error(ex); - throw ex; - } ++ return logFileData(Collections.singletonList(new Pair<LogFileKey, LogFileValue>(key, EMPTY))); + } + - public synchronized void minorCompactionStarted(int seq, int tid, String fqfn) throws IOException { ++ public LoggerOperation minorCompactionStarted(int seq, int tid, String fqfn) throws IOException { + LogFileKey key = new LogFileKey(); + key.event = COMPACTION_START; + key.seq = seq; + key.tid = tid; + key.filename = fqfn; - try { - write(key, EMPTY); - } catch (IOException ex) { - log.error(ex); - throw ex; - } ++ return logFileData(Collections.singletonList(new Pair<LogFileKey, LogFileValue>(key, EMPTY))); + } + + public String getLogger() { + String parts[] = logPath.split("/"); + return StringUtil.join(Arrays.asList(parts[parts.length - 2].split("[+]")), ":"); + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/7c309097/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java ---------------------------------------------------------------------- diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java index a276a97,0000000..fb90757 mode 100644,000000..100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java @@@ -1,430 -1,0 +1,430 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.tserver.log; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.KeyExtent; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.util.UtilWaitThread; +import org.apache.accumulo.server.fs.VolumeManager; +import org.apache.accumulo.tserver.Tablet; +import org.apache.accumulo.tserver.Tablet.CommitSession; +import org.apache.accumulo.tserver.TabletMutations; +import org.apache.accumulo.tserver.TabletServer; +import org.apache.accumulo.tserver.log.DfsLogger.LoggerOperation; +import org.apache.hadoop.fs.Path; +import org.apache.log4j.Logger; + +/** + * Central logging facility for the TServerInfo. + * + * Forwards in-memory updates to remote logs, carefully writing the same data to every log, while maintaining the maximum thread parallelism for greater + * performance. As new logs are used and minor compactions are performed, the metadata table is kept up-to-date. + * + */ +public class TabletServerLogger { + + private static final Logger log = Logger.getLogger(TabletServerLogger.class); + + private final AtomicLong logSizeEstimate = new AtomicLong(); + private final long maxSize; + + private final TabletServer tserver; + + // The current log set: always updated to a new set with every change of loggers + private final List<DfsLogger> loggers = new ArrayList<DfsLogger>(); + + // The current generation of logSet. + // Because multiple threads can be using a log set at one time, a log + // failure is likely to affect multiple threads, who will all attempt to + // create a new logSet. This will cause many unnecessary updates to the + // metadata table. + // We'll use this generational counter to determine if another thread has + // already fetched a new logSet. + private AtomicInteger logSetId = new AtomicInteger(); + + // Use a ReadWriteLock to allow multiple threads to use the log set, but obtain a write lock to change them + private final ReentrantReadWriteLock logSetLock = new ReentrantReadWriteLock(); + + private final AtomicInteger seqGen = new AtomicInteger(); + + private static boolean enabled(Tablet tablet) { + return tablet.getTableConfiguration().getBoolean(Property.TABLE_WALOG_ENABLED); + } + + private static boolean enabled(CommitSession commitSession) { + return enabled(commitSession.getTablet()); + } + + static private abstract class TestCallWithWriteLock { + abstract boolean test(); + + abstract void withWriteLock() throws IOException; + } + + /** + * Pattern taken from the documentation for ReentrantReadWriteLock + * + * @param rwlock + * lock to use + * @param code + * a test/work pair + * @throws IOException + */ + private static void testLockAndRun(final ReadWriteLock rwlock, TestCallWithWriteLock code) throws IOException { + // Get a read lock + rwlock.readLock().lock(); + try { + // does some condition exist that needs the write lock? + if (code.test()) { + // Yes, let go of the readlock + rwlock.readLock().unlock(); + // Grab the write lock + rwlock.writeLock().lock(); + try { + // double-check the condition, since we let go of the lock + if (code.test()) { + // perform the work with with write lock held + code.withWriteLock(); + } + } finally { + // regain the readlock + rwlock.readLock().lock(); + // unlock the write lock + rwlock.writeLock().unlock(); + } + } + } finally { + // always let go of the lock + rwlock.readLock().unlock(); + } + } + + public TabletServerLogger(TabletServer tserver, long maxSize) { + this.tserver = tserver; + this.maxSize = maxSize; + } + + private int initializeLoggers(final List<DfsLogger> copy) throws IOException { + final int[] result = {-1}; + testLockAndRun(logSetLock, new TestCallWithWriteLock() { + boolean test() { + copy.clear(); + copy.addAll(loggers); + if (!loggers.isEmpty()) + result[0] = logSetId.get(); + return loggers.isEmpty(); + } + + void withWriteLock() throws IOException { + try { + createLoggers(); + copy.clear(); + copy.addAll(loggers); + if (copy.size() > 0) + result[0] = logSetId.get(); + else + result[0] = -1; + } catch (IOException e) { + log.error("Unable to create loggers", e); + } + } + }); + return result[0]; + } + + public void getLogFiles(Set<String> loggersOut) { + logSetLock.readLock().lock(); + try { + for (DfsLogger logger : loggers) { + loggersOut.add(logger.getFileName()); + } + } finally { + logSetLock.readLock().unlock(); + } + } + + synchronized private void createLoggers() throws IOException { + if (!logSetLock.isWriteLockedByCurrentThread()) { + throw new IllegalStateException("createLoggers should be called with write lock held!"); + } + + if (loggers.size() != 0) { + throw new IllegalStateException("createLoggers should not be called when loggers.size() is " + loggers.size()); + } + + try { + DfsLogger alog = new DfsLogger(tserver.getServerConfig()); + alog.open(tserver.getClientAddressString()); + loggers.add(alog); + logSetId.incrementAndGet(); + return; + } catch (Exception t) { + throw new RuntimeException(t); + } + } + + public void resetLoggers() throws IOException { + logSetLock.writeLock().lock(); + try { + close(); + } finally { + logSetLock.writeLock().unlock(); + } + } + + synchronized private void close() throws IOException { + if (!logSetLock.isWriteLockedByCurrentThread()) { + throw new IllegalStateException("close should be called with write lock held!"); + } + try { + for (DfsLogger logger : loggers) { + try { + logger.close(); + } catch (DfsLogger.LogClosedException ex) { + // ignore + } catch (Throwable ex) { + log.error("Unable to cleanly close log " + logger.getFileName() + ": " + ex); + } + } + loggers.clear(); + logSizeEstimate.set(0); + } catch (Throwable t) { + throw new IOException(t); + } + } + + interface Writer { + LoggerOperation write(DfsLogger logger, int seq) throws Exception; + } + + private int write(CommitSession commitSession, boolean mincFinish, Writer writer) throws IOException { + List<CommitSession> sessions = Collections.singletonList(commitSession); + return write(sessions, mincFinish, writer); + } + + private int write(Collection<CommitSession> sessions, boolean mincFinish, Writer writer) throws IOException { + // Work very hard not to lock this during calls to the outside world + int currentLogSet = logSetId.get(); + + int seq = -1; + + int attempt = 0; + boolean success = false; + while (!success) { + try { + // get a reference to the loggers that no other thread can touch + ArrayList<DfsLogger> copy = new ArrayList<DfsLogger>(); + currentLogSet = initializeLoggers(copy); + + // add the logger to the log set for the memory in the tablet, + // update the metadata table if we've never used this tablet + + if (currentLogSet == logSetId.get()) { + for (CommitSession commitSession : sessions) { + if (commitSession.beginUpdatingLogsUsed(copy, mincFinish)) { + try { + // Scribble out a tablet definition and then write to the metadata table + defineTablet(commitSession); + if (currentLogSet == logSetId.get()) + tserver.addLoggersToMetadata(copy, commitSession.getExtent(), commitSession.getLogId()); + } finally { + commitSession.finishUpdatingLogsUsed(); + } + } + } + } + + // Make sure that the logs haven't changed out from underneath our copy + if (currentLogSet == logSetId.get()) { + + // write the mutation to the logs + seq = seqGen.incrementAndGet(); + if (seq < 0) + throw new RuntimeException("Logger sequence generator wrapped! Onos!!!11!eleven"); + ArrayList<LoggerOperation> queuedOperations = new ArrayList<LoggerOperation>(copy.size()); + for (DfsLogger wal : copy) { + LoggerOperation lop = writer.write(wal, seq); + if (lop != null) + queuedOperations.add(lop); + } + + for (LoggerOperation lop : queuedOperations) { + lop.await(); + } + + // double-check: did the log set change? + success = (currentLogSet == logSetId.get()); + } + } catch (DfsLogger.LogClosedException ex) { + log.debug("Logs closed while writing, retrying " + (attempt + 1)); + } catch (Exception t) { + log.error("Unexpected error writing to log, retrying attempt " + (attempt + 1), t); + UtilWaitThread.sleep(100); + } finally { + attempt++; + } + // Some sort of write failure occurred. Grab the write lock and reset the logs. + // But since multiple threads will attempt it, only attempt the reset when + // the logs haven't changed. + final int finalCurrent = currentLogSet; + if (!success) { + testLockAndRun(logSetLock, new TestCallWithWriteLock() { + + @Override + boolean test() { + return finalCurrent == logSetId.get(); + } + + @Override + void withWriteLock() throws IOException { + close(); + } + }); + } + } + // if the log gets too big, reset it .. grab the write lock first + logSizeEstimate.addAndGet(4 * 3); // event, tid, seq overhead + testLockAndRun(logSetLock, new TestCallWithWriteLock() { + boolean test() { + return logSizeEstimate.get() > maxSize; + } + + void withWriteLock() throws IOException { + close(); + } + }); + return seq; + } + + public int defineTablet(final CommitSession commitSession) throws IOException { + // scribble this into the metadata tablet, too. + if (!enabled(commitSession)) + return -1; + return write(commitSession, false, new Writer() { + @Override + public LoggerOperation write(DfsLogger logger, int ignored) throws Exception { + logger.defineTablet(commitSession.getWALogSeq(), commitSession.getLogId(), commitSession.getExtent()); + return null; + } + }); + } + + public int log(final CommitSession commitSession, final int tabletSeq, final Mutation m) throws IOException { + if (!enabled(commitSession)) + return -1; + int seq = write(commitSession, false, new Writer() { + @Override + public LoggerOperation write(DfsLogger logger, int ignored) throws Exception { + return logger.log(tabletSeq, commitSession.getLogId(), m); + } + }); + logSizeEstimate.addAndGet(m.numBytes()); + return seq; + } + + public int logManyTablets(Map<CommitSession,List<Mutation>> mutations) throws IOException { + + final Map<CommitSession,List<Mutation>> loggables = new HashMap<CommitSession,List<Mutation>>(mutations); + for (CommitSession t : mutations.keySet()) { + if (!enabled(t)) + loggables.remove(t); + } + if (loggables.size() == 0) + return -1; + + int seq = write(loggables.keySet(), false, new Writer() { + @Override + public LoggerOperation write(DfsLogger logger, int ignored) throws Exception { + List<TabletMutations> copy = new ArrayList<TabletMutations>(loggables.size()); + for (Entry<CommitSession,List<Mutation>> entry : loggables.entrySet()) { + CommitSession cs = entry.getKey(); + copy.add(new TabletMutations(cs.getLogId(), cs.getWALogSeq(), entry.getValue())); + } + return logger.logManyTablets(copy); + } + }); + for (List<Mutation> entry : loggables.values()) { + if (entry.size() < 1) + throw new IllegalArgumentException("logManyTablets: logging empty mutation list"); + for (Mutation m : entry) { + logSizeEstimate.addAndGet(m.numBytes()); + } + } + return seq; + } + + public void minorCompactionFinished(final CommitSession commitSession, final String fullyQualifiedFileName, final int walogSeq) throws IOException { + + if (!enabled(commitSession)) + return; + + long t1 = System.currentTimeMillis(); + + int seq = write(commitSession, true, new Writer() { + @Override + public LoggerOperation write(DfsLogger logger, int ignored) throws Exception { - logger.minorCompactionFinished(walogSeq, commitSession.getLogId(), fullyQualifiedFileName); ++ logger.minorCompactionFinished(walogSeq, commitSession.getLogId(), fullyQualifiedFileName).await(); + return null; + } + }); + + long t2 = System.currentTimeMillis(); + + log.debug(" wrote MinC finish " + seq + ": writeTime:" + (t2 - t1) + "ms "); + } + + public int minorCompactionStarted(final CommitSession commitSession, final int seq, final String fullyQualifiedFileName) throws IOException { + if (!enabled(commitSession)) + return -1; + write(commitSession, false, new Writer() { + @Override + public LoggerOperation write(DfsLogger logger, int ignored) throws Exception { - logger.minorCompactionStarted(seq, commitSession.getLogId(), fullyQualifiedFileName); ++ logger.minorCompactionStarted(seq, commitSession.getLogId(), fullyQualifiedFileName).await(); + return null; + } + }); + return seq; + } + + public void recover(VolumeManager fs, Tablet tablet, List<Path> logs, Set<String> tabletFiles, MutationReceiver mr) throws IOException { + if (!enabled(tablet)) + return; + try { + SortedLogRecovery recovery = new SortedLogRecovery(fs); + KeyExtent extent = tablet.getExtent(); + recovery.recover(extent, logs, tabletFiles, mr); + } catch (Exception e) { + throw new IOException(e); + } + } + +}