Author: mturk Date: Thu Aug 18 11:30:09 2011 New Revision: 1159172 URL: http://svn.apache.org/viewvc?rev=1159172&view=rev Log: Add initial memory stream (bidirectional Byte/ByteBuffer streams)
Added: commons/sandbox/runtime/trunk/src/main/java/org/apache/commons/runtime/io/MemoryStream.java (with props) commons/sandbox/runtime/trunk/src/main/native/shared/memstream.c (with props) commons/sandbox/runtime/trunk/src/main/test/org/apache/commons/runtime/TestMemoryStream.java (with props) Modified: commons/sandbox/runtime/trunk/src/main/native/Makefile.msc.in commons/sandbox/runtime/trunk/src/main/native/Makefile.unx.in Added: commons/sandbox/runtime/trunk/src/main/java/org/apache/commons/runtime/io/MemoryStream.java URL: http://svn.apache.org/viewvc/commons/sandbox/runtime/trunk/src/main/java/org/apache/commons/runtime/io/MemoryStream.java?rev=1159172&view=auto ============================================================================== --- commons/sandbox/runtime/trunk/src/main/java/org/apache/commons/runtime/io/MemoryStream.java (added) +++ commons/sandbox/runtime/trunk/src/main/java/org/apache/commons/runtime/io/MemoryStream.java Thu Aug 18 11:30:09 2011 @@ -0,0 +1,337 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.commons.runtime.io; + +import java.io.IOException; +import java.io.SyncFailedException; +import java.nio.ByteBuffer; +import org.apache.commons.runtime.InvalidArgumentException; +import org.apache.commons.runtime.Pointer; +import org.apache.commons.runtime.util.Utils; + +/** + * Bidirectional Memory Stream. + * <p> + * This class uses native heap memory for internal data buffer and its + * capacity is limited to {@code Integer.MAX_VALUE} bytes. Stream dynamically + * allocates and extends the memory and as needed. + * </p> + */ +public final class MemoryStream extends Stream + implements Streamable, Reader, Writer +{ + + private long sb; + private int rdpos; + + private static native long alloc0(int size); + private static native void free0(long sb); + private static native void clear0(long sb); + + private static native void flush0(long sb); + private static native void finish0(long sb); + private static native boolean finished0(long sb); + private static native int length0(long sb); + + private static native int read0(long sb, int pos); + + // Native write methods + private static native int write0(long sb, int ch) + throws IOException; + private static native int write1(long sb, byte[] buf, int off, int len) + throws IOException; + private static native int write2(long sb, long ptr, int len) + throws IOException; + private static native int write3(long sb, ByteBuffer buf, int off, int len) + throws IOException; + private static native long write4(long sb, byte[][] buf, int off, int len) + throws IOException; + private static native long write5(long sb, ByteBuffer[] buf, int off, int len) + throws IOException; + + /** + * Creates a new memory stream instance with default buffer size. + */ + public MemoryStream() + { + this(0); + } + + /** + * Creates a new memory stream instance with given buffer size. + */ + public MemoryStream(int size) + { + if (size < 32) + size = 32; + sb = alloc0(size); + if (sb == 0L) + throw new OutOfMemoryError(); + } + + /** + * Returns the number of bytes that are available before this stream will + * block. + * + * @return the number of bytes available before blocking. + * @throws IOException + * if an error occurs in this stream. + */ + public synchronized int available() + throws IOException + { + return length0(sb) - rdpos; + } + + /** + * Closes the object and release any system resources it holds. If the + * object has already been closed, then invoking this method has no effect. + * + * @throws IOException + * if any error occurs when closing the object. + */ + public synchronized void close() + throws IOException + { + if (sb != 0L) { + free0(sb); + sb = 0L; + } + } + /** + * Flush the underlying stream metadata. + * <p> + * {@code flush} transfers all modified metadata of the stream object + * referred to by {@code this} stream to the disk device + * (or other storage device) where that object resides. + * The call blocks until the device reports that the transfer has + * completed. It also flushes metadata information associated with + * {@code this} Descriptor. + * </p> + * + * @throws SyncFailedException when the object cannot be flushed. + * @throws IOException if an I/O error occurs. + */ + public void flush() + throws SyncFailedException, IOException + { + flush0(sb); + } + + /** + * Sync the underlying stream by writing any buffered data. + * <p> + * {@code sync} transfers all modified in-core data of the stream object + * referred to by {@code this} stream to the disk device + * (or other storage device) where that object resides. + * The call blocks until the device reports that the transfer has + * completed. It also flushes metadata information associated with + * {@code this} Descriptor. + * </p> + * + * @throws SyncFailedException when the object cannot be flushed. + * @throws IOException if an I/O error occurs. + */ + public void sync() + throws SyncFailedException, IOException + { + flush0(sb); + } + + @Override + public synchronized boolean closed() + { + return sb == 0L; + } + + /** + * Test if {@code this} stream is valid. + * + * @return {@code true} if the stream represents a valid, + * open file, socket, or other I/O object; {@code false} otherwse. + * + */ + public synchronized boolean valid() + { + return sb != 0L; + } + + /** + * Test wather or not every I/O operation on {@code this} stream will + * block until it completes. + * + * @return {@code true} if, and only if, this stream + * is in blocking mode. + * + * @throws IOException if an I/O error occurs. + */ + public boolean isBlocking() + throws IOException + { + return false; + } + + /** + * Skips over {@code count} bytes in this stream. Less than {@code count} + * bytes are skipped if the end of the stream is reached or an exception is + * thrown during the operation. Nothing is done if {@code count} is + * negative. + * + * @param count + * The number of bytes to skip. + * + * @return The number of bytes actually skipped. + * + * @throws ClosedDescriptorException + * If this stream is closed. + * @throws OperationWouldBlockException + * If the stream is in nonblocking mode and the operation + * would block. + * @throws OperationNotSupportedException + * If the stream is not seekable. + * @throws TimeoutException + * If operation times out. + * @throws IOException + * If some other I/O error occurs. + */ + public synchronized long skip(long count) + throws IOException + { + if (count < 0L || count > Integer.MAX_VALUE) + throw new InvalidArgumentException(); + long len = length0(sb); + if (count + rdpos >= len) + count = len - rdpos; + rdpos += (int)count; + return count; + } + + @Override + public boolean canSeek() + { + return true; + } + + @Override + public boolean canRead() + { + return true; + } + + @Override + public boolean canWrite() + { + return true; + } + + @Override + public synchronized boolean eof() + throws IOException + { + return rdpos >= length0(sb); + } + + @Override + public synchronized int read() + throws IOException + { + int rv = read0(sb, rdpos); + if (rv != -1) + rdpos++; + return rv; + } + + @Override + public synchronized int read(byte[] buffer, int offset, int count) + throws IndexOutOfBoundsException, IOException + { + return 0; + } + + @Override + public synchronized int read(long address, int count) + throws NullPointerException, IndexOutOfBoundsException, IOException + { + return 0; + } + + @Override + public synchronized int read(ByteBuffer buffer) + throws IndexOutOfBoundsException, IOException + { + return 0; + } + + // === Writer methods + + @Override + public int write(int b) + throws IOException + { + if (closed()) + throw new ClosedDescriptorException(); + return write0(sb, b); + } + + @Override + public int write(byte[] buffer, int offset, int count) + throws IndexOutOfBoundsException, IOException + { + if (closed()) + throw new ClosedDescriptorException(); + return write1(sb, buffer, offset, count); + } + + @Override + public int write(long address, int count) + throws IndexOutOfBoundsException, IOException + { + if (closed()) + throw new ClosedDescriptorException(); + return write2(sb, address, count); + } + + @Override + public int write(ByteBuffer buffer) + throws IndexOutOfBoundsException, IOException + { + if (closed()) + throw new ClosedDescriptorException(); + return write3(sb, buffer, buffer.position(), buffer.remaining()); + } + + @Override + public long write(byte[][] array, int offset, int count) + throws IndexOutOfBoundsException, IOException + { + if (closed()) + throw new ClosedDescriptorException(); + return write4(sb, array, offset, count); + } + + @Override + public long write(ByteBuffer[] array, int offset, int count) + throws IndexOutOfBoundsException, IOException + { + if (closed()) + throw new ClosedDescriptorException(); + return write5(sb, array, offset, count); + } + +} Propchange: commons/sandbox/runtime/trunk/src/main/java/org/apache/commons/runtime/io/MemoryStream.java ------------------------------------------------------------------------------ svn:eol-style = native Modified: commons/sandbox/runtime/trunk/src/main/native/Makefile.msc.in URL: http://svn.apache.org/viewvc/commons/sandbox/runtime/trunk/src/main/native/Makefile.msc.in?rev=1159172&r1=1159171&r2=1159172&view=diff ============================================================================== --- commons/sandbox/runtime/trunk/src/main/native/Makefile.msc.in (original) +++ commons/sandbox/runtime/trunk/src/main/native/Makefile.msc.in Thu Aug 18 11:30:09 2011 @@ -127,6 +127,7 @@ LIBSOURCES=\ $(TOPDIR)\shared\error.c \ $(TOPDIR)\shared\iofd.c \ $(TOPDIR)\shared\memory.c \ + $(TOPDIR)\shared\memstream.c \ $(TOPDIR)\shared\native.c \ $(TOPDIR)\shared\netaddr.c \ $(TOPDIR)\shared\netserv.c \ Modified: commons/sandbox/runtime/trunk/src/main/native/Makefile.unx.in URL: http://svn.apache.org/viewvc/commons/sandbox/runtime/trunk/src/main/native/Makefile.unx.in?rev=1159172&r1=1159171&r2=1159172&view=diff ============================================================================== --- commons/sandbox/runtime/trunk/src/main/native/Makefile.unx.in (original) +++ commons/sandbox/runtime/trunk/src/main/native/Makefile.unx.in Thu Aug 18 11:30:09 2011 @@ -122,6 +122,7 @@ LIBSOURCES=\ $(TOPDIR)/shared/error.c \ $(TOPDIR)/shared/iofd.c \ $(TOPDIR)/shared/memory.c \ + $(TOPDIR)/shared/memstream.c \ $(TOPDIR)/shared/native.c \ $(TOPDIR)/shared/netaddr.c \ $(TOPDIR)/shared/netserv.c \ Added: commons/sandbox/runtime/trunk/src/main/native/shared/memstream.c URL: http://svn.apache.org/viewvc/commons/sandbox/runtime/trunk/src/main/native/shared/memstream.c?rev=1159172&view=auto ============================================================================== --- commons/sandbox/runtime/trunk/src/main/native/shared/memstream.c (added) +++ commons/sandbox/runtime/trunk/src/main/native/shared/memstream.c Thu Aug 18 11:30:09 2011 @@ -0,0 +1,267 @@ +/* Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "acr/memory.h" +#include "acr/pointer.h" +#include "acr/clazz.h" +#include "acr/sbuf.h" + +ACR_IO_EXPORT(jlong, MemoryStream, alloc0)(JNI_STDARGS, jint len) +{ + acr_sb_t *sb = AcrSbNew(0, len, ACR_SB_AUTOEXTEND); + if (sb == 0) + return 0; + else + return P2J(sb); +} + +ACR_IO_EXPORT(void, MemoryStream, free0)(JNI_STDARGS, jlong sh) +{ + acr_sb_t *sb = J2P(sh, acr_sb_t *); + AcrSbFree(sb); +} + +ACR_IO_EXPORT(void, MemoryStream, clear0)(JNI_STDARGS, jlong sh) +{ + acr_sb_t *sb = J2P(sh, acr_sb_t *); + AcrSbClear(sb); +} + +ACR_IO_EXPORT(jlong, MemoryStream, detach0)(JNI_STDARGS, jlong sh) +{ + acr_sb_t *sb = J2P(sh, acr_sb_t *); + return P2J(AcrSbDetach(sb)); +} + +ACR_IO_EXPORT(jlong, MemoryStream, data0)(JNI_STDARGS, jlong sh) +{ + acr_sb_t *sb = J2P(sh, acr_sb_t *); + return P2J(AcrSbData(sb)); +} + +ACR_IO_EXPORT(jlong, MemoryStream, detach1)(JNI_STDARGS, jlong sh, jint len) +{ + char *m; + acr_sb_t *sb = J2P(sh, acr_sb_t *); + m = AcrSbDetach(sb); + AcrSbInit(sb, 0, len, 0); + sb->s_flags |= ACR_SB_DYNSTRUCT; + return P2J(m); +} + +ACR_IO_EXPORT(jint, MemoryStream, length0)(JNI_STDARGS, jlong sh) +{ + acr_sb_t *sb = J2P(sh, acr_sb_t *); + return AcrSbLen(sb); +} + +ACR_IO_EXPORT(jint, MemoryStream, capacity0)(JNI_STDARGS, jlong sh) +{ + acr_sb_t *sb = J2P(sh, acr_sb_t *); + return AcrSbCapacity(sb); +} + +ACR_IO_EXPORT(jint, MemoryStream, setmax0)(JNI_STDARGS, jlong sh, jint len) +{ + acr_sb_t *sb = J2P(sh, acr_sb_t *); + return AcrSbSetMax(sb, len); +} + +ACR_IO_EXPORT(jint, MemoryStream, setpos0)(JNI_STDARGS, jlong sh, jint pos) +{ + acr_sb_t *sb = J2P(sh, acr_sb_t *); + return AcrSbSetPos(sb, pos); +} + +ACR_IO_EXPORT(void, MemoryStream, flush0)(JNI_STDARGS, jlong sh) +{ + acr_sb_t *sb = J2P(sh, acr_sb_t *); + AcrSbFlush(sb); +} + +ACR_IO_EXPORT(void, MemoryStream, finish0)(JNI_STDARGS, jlong sh) +{ + acr_sb_t *sb = J2P(sh, acr_sb_t *); + AcrSbFinish(sb); +} + +ACR_IO_EXPORT(jboolean, MemoryStream, finished0)(JNI_STDARGS, jlong sh) +{ + acr_sb_t *sb = J2P(sh, acr_sb_t *); + return AcrSbDone(sb) ? JNI_TRUE : JNI_FALSE; +} + +ACR_IO_EXPORT(jboolean, MemoryStream, overflow0)(JNI_STDARGS, jlong sh) +{ + acr_sb_t *sb = J2P(sh, acr_sb_t *); + return AcrSbOverflowed(sb) ? JNI_TRUE : JNI_FALSE; +} + +ACR_IO_EXPORT(jint, MemoryStream, read0)(JNI_STDARGS, jlong sh, jint pos) +{ + acr_sb_t *sb = J2P(sh, acr_sb_t *); + + if (pos >= sb->s_len) + return -1; + return (unsigned char)sb->s_buf[pos]; +} + +ACR_IO_EXPORT(jint, MemoryStream, write0)(JNI_STDARGS, jlong sh, jint b) +{ + int rc; + int wr = 1; + acr_sb_t *sb = J2P(sh, acr_sb_t *); + + rc = AcrSbPutc(sb, b); + if (rc != 0) { + wr = -1; + ACR_THROW_NET_ERROR(rc); + } + return wr; +} + +ACR_IO_EXPORT(jint, MemoryStream, write1)(JNI_STDARGS, jlong sh, + jbyteArray buf, + jint off, jint len) +{ + int wr = len; + int rc = 0; + char *bb = 0; + acr_sb_t *sb = J2P(sh, acr_sb_t *); + + bb = JARRAY_CRITICAL(char, buf); + if (bb == 0) + rc = ACR_EINVAL; + else + rc = AcrSbCatb(sb, bb + off, len); + RELEASE_CRITICAL(buf, bb); + if (rc != 0) { + wr = -1; + ACR_THROW_NET_ERROR(rc); + } + return wr; +} + +ACR_IO_EXPORT(jint, MemoryStream, write2)(JNI_STDARGS, jlong sh, + jlong bp, jint len) +{ + int wr = len; + int rc = 0; + char *bb = J2P(bp, char *); + acr_sb_t *sb = J2P(sh, acr_sb_t *); + + if (bp == 0) + rc = ACR_EINVAL; + else + rc = AcrSbCatb(sb, bb, len); + if (rc != 0) { + wr = -1; + ACR_THROW_NET_ERROR(rc); + } + return wr; +} + +ACR_IO_EXPORT(jint, MemoryStream, write3)(JNI_STDARGS, jlong sh, + jobject buf, + jint off, jint len) +{ + int wr = len; + int rc = 0; + char *bb = 0; + acr_sb_t *sb = J2P(sh, acr_sb_t *); + + bb = (char *)(*env)->GetDirectBufferAddress(env, buf); + if (bb == 0) + rc = ACR_EINVAL; + else + rc = AcrSbCatb(sb, bb + off, len); + if (rc != 0) { + wr = -1; + ACR_THROW_NET_ERROR(rc); + } + return wr; +} + +ACR_IO_EXPORT(jint, MemoryStream, write4)(JNI_STDARGS, jlong sh, + jobjectArray vec, + jint off, + jint len) +{ + int i; + int rc = 0; + int wr = 0; + int blen; + jbyte *bb; + acr_sb_t *sb = J2P(sh, acr_sb_t *); + + for (i = 0; i < len; i++) { + jobject ba = (*env)->GetObjectArrayElement(env, vec, (jsize)(i + off)); + if (ba == 0) { + /* XXX: What's the correct error ? + */ + rc = ACR_EINVAL; + break; + } + blen = (int)(*env)->GetArrayLength(env, ba); + bb = (*env)->GetByteArrayElements(env, ba, 0); + rc = AcrSbCatb(sb, bb, blen); + (*env)->ReleaseByteArrayElements(env, ba, bb, JNI_ABORT); + (*env)->DeleteLocalRef(env, ba); + if (rc != 0) + break; + wr += blen; + } + if (rc != 0) { + wr = -1; + ACR_THROW_NET_ERROR(rc); + } + return wr; +} + +ACR_IO_EXPORT(jint, MemoryStream, write5)(JNI_STDARGS, jlong sh, + jobjectArray vec, + jint off, + jint len) +{ + int i; + int rc = 0; + int wr = 0; + int blen; + jbyte *bb; + acr_sb_t *sb = J2P(sh, acr_sb_t *); + + for (i = 0; i < len; i++) { + jobject ba = (*env)->GetObjectArrayElement(env, vec, (jsize)(i + off)); + if (ba == 0) { + /* XXX: What's the correct error ? + */ + rc = ACR_EINVAL; + break; + } + blen = (int)(*env)->GetDirectBufferCapacity(env, ba); + bb = (*env)->GetDirectBufferAddress(env, ba); + rc = AcrSbCatb(sb, bb, blen); + (*env)->DeleteLocalRef(env, ba); + if (rc != 0) + break; + wr += blen; + } + if (rc != 0) { + wr = -1; + ACR_THROW_NET_ERROR(rc); + } + return wr; +} Propchange: commons/sandbox/runtime/trunk/src/main/native/shared/memstream.c ------------------------------------------------------------------------------ svn:eol-style = native Added: commons/sandbox/runtime/trunk/src/main/test/org/apache/commons/runtime/TestMemoryStream.java URL: http://svn.apache.org/viewvc/commons/sandbox/runtime/trunk/src/main/test/org/apache/commons/runtime/TestMemoryStream.java?rev=1159172&view=auto ============================================================================== --- commons/sandbox/runtime/trunk/src/main/test/org/apache/commons/runtime/TestMemoryStream.java (added) +++ commons/sandbox/runtime/trunk/src/main/test/org/apache/commons/runtime/TestMemoryStream.java Thu Aug 18 11:30:09 2011 @@ -0,0 +1,40 @@ +/* Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.commons.runtime.io; + +import org.testng.annotations.*; +import org.testng.Assert; +import java.io.IOException; + +public class TestMemoryStream extends Assert +{ + + @Test(groups = { "core" }) + public void simpleWrite() + throws Throwable + { + MemoryStream s = new MemoryStream(); + s.write(32); + assertEquals(s.read(), 32); + assertEquals(s.read(), -1); + assertTrue(s.eof()); + s.write(33); + assertFalse(s.eof()); + s.close(); + } + +} Propchange: commons/sandbox/runtime/trunk/src/main/test/org/apache/commons/runtime/TestMemoryStream.java ------------------------------------------------------------------------------ svn:eol-style = native