# IGNITE-141 - S3 fixes
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/236f40f6 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/236f40f6 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/236f40f6 Branch: refs/heads/ignite-410 Commit: 236f40f6dca77e6f5d2f3e749ffe14153e04b452 Parents: 116f52f Author: Valentin Kulichenko <vkuliche...@gridgain.com> Authored: Thu Mar 5 22:17:44 2015 -0800 Committer: Valentin Kulichenko <vkuliche...@gridgain.com> Committed: Thu Mar 5 22:17:44 2015 -0800 ---------------------------------------------------------------------- .../spi/checkpoint/s3/S3CheckpointData.java | 62 ++++++++++++++++++-- .../spi/checkpoint/s3/S3CheckpointSpi.java | 14 ++--- 2 files changed, 64 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/236f40f6/modules/aws/src/main/java/org/apache/ignite/spi/checkpoint/s3/S3CheckpointData.java ---------------------------------------------------------------------- diff --git a/modules/aws/src/main/java/org/apache/ignite/spi/checkpoint/s3/S3CheckpointData.java b/modules/aws/src/main/java/org/apache/ignite/spi/checkpoint/s3/S3CheckpointData.java index 3f50048..9b18615 100644 --- a/modules/aws/src/main/java/org/apache/ignite/spi/checkpoint/s3/S3CheckpointData.java +++ b/modules/aws/src/main/java/org/apache/ignite/spi/checkpoint/s3/S3CheckpointData.java @@ -29,10 +29,7 @@ import java.io.*; * Host name is used by {@link S3CheckpointSpi} SPI to give node * correct files if it is restarted. */ -class S3CheckpointData implements Serializable { - /** */ - private static final long serialVersionUID = 0L; - +class S3CheckpointData { /** Checkpoint data. */ private final byte[] state; @@ -84,6 +81,63 @@ class S3CheckpointData implements Serializable { return key; } + /** + * @return Serialized checkpoint data. + */ + public byte[] toBytes() { + byte[] keyBytes = key.getBytes(); + + byte[] bytes = new byte[4 + state.length + 8 + 4 + keyBytes.length]; + + U.intToBytes(state.length, bytes, 0); + U.arrayCopy(state, 0, bytes, 4, state.length); + U.longToBytes(expTime, bytes, 4 + state.length); + U.intToBytes(keyBytes.length, bytes, 4 + state.length + 8); + U.arrayCopy(keyBytes, 0, bytes, 4 + state.length + 8 + 4, keyBytes.length); + + return bytes; + } + + /** + * @param in Input stream. + * @return Checkpoint data. + * @throws IOException In case of error. + */ + public static S3CheckpointData fromStream(InputStream in) throws IOException { + byte[] buf = new byte[8]; + + read(in, buf, 4); + + byte[] state = new byte[U.bytesToInt(buf, 0)]; + + read(in, state, state.length); + + read(in, buf, 8); + + long expTime = U.bytesToLong(buf, 0); + + read(in, buf, 4); + + byte[] keyBytes = new byte[U.bytesToInt(buf, 0)]; + + read(in, keyBytes, keyBytes.length); + + return new S3CheckpointData(state, expTime, new String(keyBytes)); + } + + /** + * @param in Input stream. + * @param buf Buffer. + * @param len Number of bytes to read. + * @throws IOException In case of error. + */ + private static void read(InputStream in, byte[] buf, int len) throws IOException { + int cnt = in.read(buf, 0, len); + + if (cnt < len) + throw new IOException("End of stream reached."); + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(S3CheckpointData.class, this); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/236f40f6/modules/aws/src/main/java/org/apache/ignite/spi/checkpoint/s3/S3CheckpointSpi.java ---------------------------------------------------------------------- diff --git a/modules/aws/src/main/java/org/apache/ignite/spi/checkpoint/s3/S3CheckpointSpi.java b/modules/aws/src/main/java/org/apache/ignite/spi/checkpoint/s3/S3CheckpointSpi.java index 8251d30..9a43fc6 100644 --- a/modules/aws/src/main/java/org/apache/ignite/spi/checkpoint/s3/S3CheckpointSpi.java +++ b/modules/aws/src/main/java/org/apache/ignite/spi/checkpoint/s3/S3CheckpointSpi.java @@ -27,8 +27,6 @@ import org.apache.ignite.internal.util.*; import org.apache.ignite.internal.util.tostring.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.marshaller.*; -import org.apache.ignite.marshaller.jdk.*; import org.apache.ignite.resources.*; import org.apache.ignite.spi.*; import org.apache.ignite.spi.checkpoint.*; @@ -124,10 +122,6 @@ public class S3CheckpointSpi extends IgniteSpiAdapter implements CheckpointSpi, /** Suffix to use in bucket name generation. */ public static final String DFLT_BUCKET_NAME_SUFFIX = "default-bucket"; - /** Marshaller. */ - @GridToStringExclude - private final Marshaller marsh = new JdkMarshaller(); - /** Client to interact with S3 storage. */ @GridToStringExclude private AmazonS3 s3; @@ -464,7 +458,11 @@ public class S3CheckpointSpi extends IgniteSpiAdapter implements CheckpointSpi, InputStream in = obj.getObjectContent(); try { - return marsh.unmarshal(in, U.gridClassLoader()); + return S3CheckpointData.fromStream(in); + } + catch (IOException e) { + throw new IgniteCheckedException("Failed to unmarshal S3CheckpointData [bucketName=" + + bucketName + ", key=" + key + ']', e); } finally { U.closeQuiet(in); @@ -492,7 +490,7 @@ public class S3CheckpointSpi extends IgniteSpiAdapter implements CheckpointSpi, if (log.isDebugEnabled()) log.debug("Writing data to S3 [bucket=" + bucketName + ", key=" + data.getKey() + ']'); - byte[] buf = marsh.marshal(data); + byte[] buf = data.toBytes(); ObjectMetadata meta = new ObjectMetadata();