ACCUMULO-1998 Resolving corner cases around blocked output stream behavior and migration
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/9ba06ff2 Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/9ba06ff2 Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/9ba06ff2 Branch: refs/heads/master Commit: 9ba06ff2d515d982b47b4f121a14bb5a98a024f0 Parents: 08a9804 Author: John Vines <vi...@apache.org> Authored: Thu Jan 9 17:35:02 2014 -0500 Committer: John Vines <vi...@apache.org> Committed: Thu Jan 16 17:43:51 2014 -0500 ---------------------------------------------------------------------- .../security/crypto/BlockedOutputStream.java | 6 +-- .../security/crypto/CryptoModuleFactory.java | 4 +- .../security/crypto/DefaultCryptoModule.java | 4 +- .../security/crypto/BlockedIOStreamTest.java | 44 ++++++++++++++++++++ .../apache/accumulo/tserver/log/DfsLogger.java | 12 +++--- 5 files changed, 58 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/9ba06ff2/core/src/main/java/org/apache/accumulo/core/security/crypto/BlockedOutputStream.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/security/crypto/BlockedOutputStream.java b/core/src/main/java/org/apache/accumulo/core/security/crypto/BlockedOutputStream.java index 9ca00b7..1f3cf3b 100644 --- a/core/src/main/java/org/apache/accumulo/core/security/crypto/BlockedOutputStream.java +++ b/core/src/main/java/org/apache/accumulo/core/security/crypto/BlockedOutputStream.java @@ -44,7 +44,7 @@ public class BlockedOutputStream extends OutputStream { } @Override - public void flush() throws IOException { + public synchronized void flush() throws IOException { int size = bb.position(); if (size == 0) return; @@ -64,9 +64,10 @@ public class BlockedOutputStream extends OutputStream { @Override public void write(int b) throws IOException { - bb.put((byte) b); + // Checking before provides same functionality but causes the case of previous flush() failing to now throw a buffer out of bounds error if (bb.remaining() == 0) flush(); + bb.put((byte) b); } @Override @@ -90,7 +91,6 @@ public class BlockedOutputStream extends OutputStream { @Override public void close() throws IOException { flush(); - bb = null; out.close(); } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/9ba06ff2/core/src/main/java/org/apache/accumulo/core/security/crypto/CryptoModuleFactory.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/security/crypto/CryptoModuleFactory.java b/core/src/main/java/org/apache/accumulo/core/security/crypto/CryptoModuleFactory.java index 65acc6b..3fd43a0 100644 --- a/core/src/main/java/org/apache/accumulo/core/security/crypto/CryptoModuleFactory.java +++ b/core/src/main/java/org/apache/accumulo/core/security/crypto/CryptoModuleFactory.java @@ -287,7 +287,9 @@ public class CryptoModuleFactory { params.setPadding(cipherTransformParts[2]); params.setRandomNumberGenerator(cryptoOpts.get(Property.CRYPTO_SECURE_RNG.getKey())); params.setRandomNumberGeneratorProvider(cryptoOpts.get(Property.CRYPTO_SECURE_RNG_PROVIDER.getKey())); - params.setBlockStreamSize(Integer.parseInt(cryptoOpts.get(Property.CRYPTO_BLOCK_STREAM_SIZE.getKey()))); + String blockStreamSize = cryptoOpts.get(Property.CRYPTO_BLOCK_STREAM_SIZE.getKey()); + if (blockStreamSize != null) + params.setBlockStreamSize(Integer.parseInt(blockStreamSize)); return params; } http://git-wip-us.apache.org/repos/asf/accumulo/blob/9ba06ff2/core/src/main/java/org/apache/accumulo/core/security/crypto/DefaultCryptoModule.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/security/crypto/DefaultCryptoModule.java b/core/src/main/java/org/apache/accumulo/core/security/crypto/DefaultCryptoModule.java index 347887c..dfad05e 100644 --- a/core/src/main/java/org/apache/accumulo/core/security/crypto/DefaultCryptoModule.java +++ b/core/src/main/java/org/apache/accumulo/core/security/crypto/DefaultCryptoModule.java @@ -355,7 +355,7 @@ public class DefaultCryptoModule implements CryptoModule { if (marker.equals(ENCRYPTION_HEADER_MARKER_V2)) params.setBlockStreamSize(dataIn.readInt()); else - params.setBlockStreamSize(-1); + params.setBlockStreamSize(0); } else { log.trace("Read something off of the encrypted input stream that was not the encryption header marker, so pushing back bytes and returning the given stream"); @@ -398,7 +398,7 @@ public class DefaultCryptoModule implements CryptoModule { InputStream blockedDecryptingInputStream = new CipherInputStream(params.getEncryptedInputStream(), cipher); - if (params.getBlockStreamSize() != -1) + if (params.getBlockStreamSize() > 0) blockedDecryptingInputStream = new BlockedInputStream(blockedDecryptingInputStream, cipher.getBlockSize(), params.getBlockStreamSize()); log.trace("Initialized cipher input stream with transformation ["+getCipherTransformation(params)+"]"); http://git-wip-us.apache.org/repos/asf/accumulo/blob/9ba06ff2/core/src/test/java/org/apache/accumulo/core/security/crypto/BlockedIOStreamTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/accumulo/core/security/crypto/BlockedIOStreamTest.java b/core/src/test/java/org/apache/accumulo/core/security/crypto/BlockedIOStreamTest.java index faba913..6fb52dd 100644 --- a/core/src/test/java/org/apache/accumulo/core/security/crypto/BlockedIOStreamTest.java +++ b/core/src/test/java/org/apache/accumulo/core/security/crypto/BlockedIOStreamTest.java @@ -22,6 +22,7 @@ import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.DataInputStream; import java.io.IOException; +import java.util.Random; import org.apache.accumulo.core.Constants; import org.junit.Test; @@ -71,4 +72,47 @@ public class BlockedIOStreamTest { public void testSmallBufferBlockedIO() throws IOException { writeRead(16, (12 + 4) * (int) (Math.ceil(25.0/12) + Math.ceil(31.0/12))); } + + @Test + public void testSpillingOverOutputStream() throws IOException { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + // buffer will be size 12 + BlockedOutputStream blockOut = new BlockedOutputStream(baos, 16, 16); + Random r = new Random(22); + + byte[] undersized = new byte[11]; + byte[] perfectSized = new byte[12]; + byte[] overSized = new byte[13]; + byte[] perfectlyOversized = new byte[13]; + byte filler = (byte) r.nextInt(); + + r.nextBytes(undersized); + r.nextBytes(perfectSized); + r.nextBytes(overSized); + r.nextBytes(perfectlyOversized); + + // 1 block + blockOut.write(undersized); + blockOut.write(filler); + blockOut.flush(); + + // 2 blocks + blockOut.write(perfectSized); + blockOut.write(filler); + blockOut.flush(); + + // 2 blocks + blockOut.write(overSized); + blockOut.write(filler); + blockOut.flush(); + + // 3 blocks + blockOut.write(undersized); + blockOut.write(perfectlyOversized); + blockOut.write(filler); + blockOut.flush(); + + baos.close(); + assertEquals(16*8, baos.toByteArray().length); + } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/9ba06ff2/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java index cc28ac2..3074614 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java @@ -366,14 +366,14 @@ public class DfsLogger { logFile.write(LOG_FILE_HEADER_V3.getBytes()); CryptoModuleParameters params = CryptoModuleFactory.createParamsObjectFromAccumuloConfiguration(conf.getConfiguration()); - + params.setPlaintextOutputStream(new NoFlushOutputStream(logFile)); - + // In order to bootstrap the reading of this file later, we have to record the CryptoModule that was used to encipher it here, // so that that crypto module can re-read its own parameters. - + logFile.writeUTF(conf.getConfiguration().get(Property.CRYPTO_MODULE_CLASS)); - + params = cryptoModule.getEncryptingOutputStream(params); OutputStream encipheringOutputStream = params.getEncryptedOutputStream(); @@ -437,7 +437,7 @@ public class DfsLogger { log.info("Interrupted"); } } - + if (encryptingLogFile != null) try { encryptingLogFile.close(); @@ -492,7 +492,7 @@ public class DfsLogger { work.exception = e; } } - + synchronized (closeLock) { // use a different lock for close check so that adding to work queue does not need // to wait on walog I/O operations