[
https://issues.apache.org/jira/browse/HADOOP-13578?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15668094#comment-15668094
]
churro morales commented on HADOOP-13578:
-----------------------------------------
[~jlowe]
Thanks for the feedback. As for your concerns:
bq. The problem with ignoring the Codec interface is that code using the
Compressor/Decompressor interfaces will break if it happens to get the zstd
codec but work for all others. That's not really adding a full Codec for zstd.
I agree it's not the most ideal interface for the way zstd wants to be called,
but that doesn't change the fact that existing code that should work as-is on a
Hadoop-provided codec will not if we don't implement the
Compressor/Decompressor interfaces.
As much as it pains me, I agree with your sentiments. I will implement the
Compressor / Decompressor interfaces.
bq. It looks like the zstd code already includes a wrapper that translates the
zlib API into the zstd API (see
https://github.com/facebook/zstd/tree/dev/zlibWrapper) which we can maybe
leverage or at least use as a model for the Compressor/Decompressor
implementations.
Good point, I did look at using the zlib wrapper originally, but in v1.0.0
(https://github.com/facebook/zstd/tree/v1.0.0/zlibWrapper) they didn't have
support for deflateReset and inflateReset which would have errored out and
broken our code. No worries though, I think I have it figured out using the
Compressor / Decompressor logic without having to reference the zlibwrapper.
bq. To be clear, I think it's a good thing that the compressor/decompressor
streams are using zstd more natively, but I also think it's important to not
ignore the non-stream Codec APIs.
I think the reason we need to implement the compressor / decompressor is more
for the reuse from the CodecPool. Each compressor / decompressor / codec is
very much tied to whether the compression library has been implemented using a
stream based or block based approach. From what I can see, the API is called
as follows:
{code}
Compressor compressor = CodecPool.getCompressor();
codec.createOutputStream(OutputStream, compressor);
// do work
{code}
And when we look at specific implementations of codec.createOutputStream(),
codec's like Snappy always returns a Block(Compressor|Decompressor)Stream and
it seems to me that the Snappy(Compressor|Decompressor) can only work on these
type of streams. Looks like the way it is used everywhere (including hbase) is
that we always get a stream and work on it, the compressor / decompressor are
passed in so we have a pool and don't have to constantly re-initialize these
native libs.
For example I wrote this test, and if you use the
Compression(Input|Output)Streams that are native to the codec the test will
pass. But if you create a Compressor/Decompressor and use a
Compression(Input|Output)Stream that is not specific to this codec the test
will fail.
{code:java}
@Test
public void testHandlingWithStreams() throws Exception {
byte[] bytes = BytesGenerator.get(1024*64);
ByteArrayInputStream inputStream = new ByteArrayInputStream(bytes);
SnappyCodec codec = new SnappyCodec();
codec.setConf(new Configuration());
Compressor compressor = codec.createCompressor();
ByteArrayOutputStream baos = new ByteArrayOutputStream();
CompressionOutputStream compressorStream = new CompressorStream(baos,
compressor);
// if you replace compressorStream with
// CompressionOutputStream compressorStream =
codec.createOutputStream(baos, compressor);
// things work fine.
byte[] buffer = new byte[100];
int result;
while ((result = inputStream.read(buffer, 0, buffer.length)) != -1) {
compressorStream.write(buffer, 0, result);
}
compressorStream.flush();
compressorStream.finish();
// lets make sure the compressed bytes are able to be decompressed to read
byte[] compressedBytes = baos.toByteArray();
ByteArrayInputStream bais = new ByteArrayInputStream(compressedBytes);
baos = new ByteArrayOutputStream();
Decompressor decompressor = codec.createDecompressor();
CompressionInputStream decompressorStream = new DecompressorStream(bais,
decompressor);
// if you replace decompressorStream with
// CompressionInputStream decompressorStream =
codec.createInputStream(bais, decompressor);
// things work fine.
byte[] decompressionBuffer = new byte[100];
while ((result = decompressorStream.read(decompressionBuffer, 0,
buffer.length)) != -1) {
baos.write(decompressionBuffer, 0, result);
}
decompressorStream.close();
byte[] decompressBytes = baos.toByteArray();
assertArrayEquals(bytes, decompressBytes);
}
{code}
bq. I'm also not so sure about the minimum buffer assertion. I saw in the zstd
unit tests there is a byte-by-byte streaming decompression test where it tries
to decompress a buffer with a 1-byte input buffer and a 1-byte output buffer.
You are absolutely correct. I will allow the user to specify the buffer size
and default to the one zstandard recommends.
I almost have a patch ready for you by implementing a Compressor |
Decompressor. Unfortunately this API makes things a bit clunky but is needed
because of the pooling so we don't have to reinitialize the shared libraries.
I will have a new patch up soon and thank you for taking the time to comment
and review this work, it is much appreciated.
> Add Codec for ZStandard Compression
> -----------------------------------
>
> Key: HADOOP-13578
> URL: https://issues.apache.org/jira/browse/HADOOP-13578
> Project: Hadoop Common
> Issue Type: New Feature
> Reporter: churro morales
> Assignee: churro morales
> Attachments: HADOOP-13578.patch, HADOOP-13578.v1.patch,
> HADOOP-13578.v2.patch
>
>
> ZStandard: https://github.com/facebook/zstd has been used in production for 6
> months by facebook now. v1.0 was recently released. Create a codec for this
> library.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]