Author: fhanik
Date: Wed Mar  8 13:23:10 2006
New Revision: 384333

URL: http://svn.apache.org/viewcvs?rev=384333&view=rev
Log:
Working on the lazy replicated hash map

Added:
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/DirectByteArrayOutputStream.java
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/Diffable.java
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/Streamable.java
Modified:
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/InterceptorPayload.java
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationTransmitter.java
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/RpcChannel.java

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/InterceptorPayload.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/InterceptorPayload.java?rev=384333&r1=384332&r2=384333&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/InterceptorPayload.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/InterceptorPayload.java
 Wed Mar  8 13:23:10 2006
@@ -15,19 +15,11 @@
  */
 package org.apache.catalina.tribes;
 
+import java.util.Stack;
+
 /**
- * <p>Title: </p>
- *
- * <p>Description: </p>
- *
- * <p>Copyright: Copyright (c) 2005</p>
- *
- * <p>Company: </p>
- *
  * @author Filip Hanik
  * @version 1.0
  */
-public class InterceptorPayload {
-    public InterceptorPayload() {
-    }
+public class InterceptorPayload extends Stack {
 }

Added: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/DirectByteArrayOutputStream.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/DirectByteArrayOutputStream.java?rev=384333&view=auto
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/DirectByteArrayOutputStream.java
 (added)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/DirectByteArrayOutputStream.java
 Wed Mar  8 13:23:10 2006
@@ -0,0 +1,62 @@
+/*
+ * Copyright 1999,2004-2006 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.catalina.tribes.io;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+/**
+ * Byte array output stream that exposes the byte array directly
+ *
+ * @author not attributable
+ * @version 1.0
+ */
+public class DirectByteArrayOutputStream extends OutputStream {
+    
+    private XByteBuffer buffer;
+    
+    public DirectByteArrayOutputStream(int size) {
+        buffer = new XByteBuffer(size,false);
+    }
+
+    /**
+     * Writes the specified byte to this output stream.
+     *
+     * @param b the <code>byte</code>.
+     * @throws IOException if an I/O error occurs. In particular, an
+     *   <code>IOException</code> may be thrown if the output stream has
+     *   been closed.
+     * @todo Implement this java.io.OutputStream method
+     */
+    public void write(int b) throws IOException {
+        buffer.append((byte)b);
+    }
+    
+    public int size() {
+        return buffer.getLength();
+    }
+    
+    public byte[] getArrayDirect() {
+        return buffer.getBytesDirect();
+    }
+    
+    public byte[] getArray() {
+        return buffer.getBytes();
+    }
+
+
+}
\ No newline at end of file

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java?rev=384333&r1=384332&r2=384333&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/XByteBuffer.java
 Wed Mar  8 13:23:10 2006
@@ -515,9 +515,14 @@
     
     public static Serializable deserialize(byte[] data) 
         throws IOException, ClassNotFoundException, ClassCastException {
+        return deserialize(data,0,data.length);
+    }
+    
+    public static Serializable deserialize(byte[] data, int offset, int 
length) 
+            throws IOException, ClassNotFoundException, ClassCastException {
         Object message = null;
         if (data != null) {
-            InputStream  instream = new ByteArrayInputStream(data);
+            InputStream  instream = new 
ByteArrayInputStream(data,offset,length);
             ReplicationStream stream = new ReplicationStream(instream,new 
ClassLoader[] {XByteBuffer.class.getClassLoader()});
             message = stream.readObject();
             instream.close();

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationTransmitter.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationTransmitter.java?rev=384333&r1=384332&r2=384333&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationTransmitter.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReplicationTransmitter.java
 Wed Mar  8 13:23:10 2006
@@ -40,8 +40,7 @@
  * @version $Revision: 379956 $ $Date: 2006-02-22 16:57:35 -0600 (Wed, 22 Feb 
2006) $
  */
 public class ReplicationTransmitter implements ChannelSender,IDynamicProperty {
-    private static org.apache.commons.logging.Log log = 
org.apache.commons.logging.LogFactory
-            .getLog(ReplicationTransmitter.class);
+    private static org.apache.commons.logging.Log log = 
org.apache.commons.logging.LogFactory.getLog(ReplicationTransmitter.class);
 
     /**
      * The descriptive information about this implementation.

Added: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/Diffable.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/Diffable.java?rev=384333&view=auto
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/Diffable.java
 (added)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/Diffable.java
 Wed Mar  8 13:23:10 2006
@@ -0,0 +1,35 @@
+/*
+ * Copyright 1999,2004-2006 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.catalina.tribes.tipis;
+
+import java.io.Serializable;
+import java.io.IOException;
+
+/**
+ * 
+ *
+ * @author Filip Hanik
+ * @version 1.0
+ */
+public interface Diffable extends Serializable {
+    
+    public byte[] getDiff() throws IOException;
+    
+    public void applyDiff(byte[] diff, int offset, int length) throws 
IOException;
+    
+    public boolean hasDiff();
+    
+}
\ No newline at end of file

Added: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java?rev=384333&view=auto
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java
 (added)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java
 Wed Mar  8 13:23:10 2006
@@ -0,0 +1,354 @@
+/*
+ * Copyright 1999,2004-2006 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.catalina.tribes.tipis;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.catalina.tribes.Channel;
+import java.io.Serializable;
+import org.apache.catalina.tribes.Member;
+import java.io.UnsupportedEncodingException;
+import java.io.IOException;
+import org.apache.catalina.tribes.io.DirectByteArrayOutputStream;
+import java.io.ObjectOutputStream;
+import org.apache.catalina.tribes.io.XByteBuffer;
+import java.util.Set;
+import java.util.LinkedHashMap;
+import org.apache.catalina.tribes.ChannelListener;
+import java.util.Collection;
+import org.apache.catalina.tribes.MembershipListener;
+
+/**
+ * @author Filip Hanik
+ * @version 1.0
+ */
+public class LazyReplicatedMap extends LinkedHashMap 
+    implements RpcCallback, ChannelListener, MembershipListener {
+    protected static org.apache.commons.logging.Log log = 
org.apache.commons.logging.LogFactory.getLog(LazyReplicatedMap.class);
+    
+//------------------------------------------------------------------------------
    
+//              INSTANCE VARIABLES
+//------------------------------------------------------------------------------
   
+
+    private Channel channel;
+    private RpcChannel rpcChannel;
+    
+    
+//------------------------------------------------------------------------------
    
+//              CONSTRUCTORS / DESTRUCTORS
+//------------------------------------------------------------------------------
   
+    public LazyReplicatedMap(Channel channel, String mapContextName, int 
initialCapacity, float loadFactor) {
+        super(initialCapacity,loadFactor);
+        init(channel,mapContextName);
+    }
+
+    public LazyReplicatedMap(Channel channel, String mapContextName, int 
initialCapacity) {
+        super(initialCapacity);
+        init(channel,mapContextName);
+    }
+
+    public LazyReplicatedMap(Channel channel, String mapContextName) {
+        super();
+        init(channel,mapContextName);
+    }
+    
+    void init(Channel channel, String mapContextName) {
+        final String chset = "ISO-8859-1";
+        this.channel = channel;
+        try {
+            this.rpcChannel = new RpcChannel(mapContextName.getBytes(chset), 
channel, this);
+        }catch (UnsupportedEncodingException x) {
+            log.warn("Unable to encode mapContextName["+mapContextName+"] 
using getBytes("+chset+") using default getBytes()",x);
+            this.rpcChannel = new RpcChannel(mapContextName.getBytes(), 
channel, this);
+        }
+        this.channel.addChannelListener(this);
+        this.channel.addMembershipListener(this);
+        
+    }
+    
+    public void breakDown() {
+        finalize();
+    }
+    
+    public void finalize() {
+        if ( this.rpcChannel!=null ) {
+            this.rpcChannel.breakDown();
+        }
+        if ( this.channel != null ) {
+            this.channel.removeChannelListener(this);
+            this.channel.removeMembershipListener(this);
+        }
+        this.rpcChannel = null;
+        this.channel = null;
+    }
+    
+//------------------------------------------------------------------------------
    
+//              GROUP COM INTERFACES
+//------------------------------------------------------------------------------
   
+    /**
+     * 
+     * @param msg Serializable
+     * @return Serializable - null if no reply should be sent
+     */
+    public Serializable replyRequest(Serializable msg, Member sender) {
+        throw new UnsupportedOperationException();
+    }
+
+    /**
+     * If the reply has already been sent to the requesting thread,
+     * the rpc callback can handle any data that comes in after the fact.
+     * @param msg Serializable
+     * @param sender Member
+     */
+    public void leftOver(Serializable msg, Member sender) {
+        throw new UnsupportedOperationException();
+    }
+
+    public void messageReceived(Serializable msg, Member sender) {
+        throw new UnsupportedOperationException();
+    }
+    
+    public boolean accept(Serializable msg, Member sender) {
+        throw new UnsupportedOperationException();
+    }
+    
+    public void memberAdded(Member member) {
+        
+    }
+    public void memberDisappeared(Member member) {
+        
+    }
+    
+//------------------------------------------------------------------------------
    
+//              METHODS TO OVERRIDE    
+//------------------------------------------------------------------------------
+
+    public Object get(Object key) {
+        return super.get(key);
+    }
+
+    public boolean containsKey(Object key) {
+        return super.containsKey(key);
+    }
+
+    public Object put(Object key, Object value) {
+        return super.put(key,value);
+    }
+
+    public void putAll(Map m) {
+        super.putAll(m);
+    }
+
+    public Object remove(Object key) {
+        return super.remove(key);
+    }
+
+    public void clear() {
+        super.clear();
+    }
+
+    public boolean containsValue(Object value) {
+        return super.containsValue(value);
+    }
+
+    public Object clone() {
+        return super.clone();
+    }
+    
+    public Set entrySet() {
+        return super.entrySet();
+    }
+    
+    public Set keySet() {
+        return super.keySet();
+    }
+    
+    public int size() {
+        return super.size();
+    }
+    
+    protected boolean removeEldestEntry(Map.Entry eldest) {
+        return false;
+    }
+    
+    public boolean isEmpty() {
+        return super.isEmpty();
+    }
+    
+    public Collection values() {
+        return super.values();
+    }
+    
+
+//------------------------------------------------------------------------------
    
+//                Map Entry class    
+//------------------------------------------------------------------------------
+    public static class MapEntry implements Map.Entry {
+        private boolean backup;
+        private boolean proxy;
+        private Member  backupNode;
+        
+        private Serializable key;
+        private Serializable value;
+        
+        public MapEntry(Serializable key, Serializable value) {
+            this.key = key;
+            this.value = value;
+        }
+        
+        public boolean isBackup() {
+            return backup;
+        }
+        
+        public void setBackup(boolean backup) {
+            this.backup = backup;
+        }
+        
+        public boolean isProxy() {
+            return proxy;
+        }
+
+        public void setProxy(boolean proxy) {
+            this.proxy = proxy;
+        }
+
+        public boolean isDiffable() {
+            return (value instanceof Diffable);
+        }
+        
+        public void setBackupNode(Member node) {
+            this.backupNode = node;
+        }
+        
+        public Member getBackupNode() {
+            return backupNode;
+        }
+        
+        
+        
+        public Object getValue() {
+            return value;
+        }
+        
+        public Object setValue(Object value) {
+            Object old = this.value;
+            this.value = (Serializable)value;
+            return old;
+        }
+        
+        public Object getKey() {
+            return key;
+        }
+
+
+        public byte[] getDiff() throws IOException {
+            if ( isDiffable() ) {
+                return ((Diffable)value).getDiff();
+            } else {
+                return getData();
+            }
+        }
+        
+        public int hashCode() {
+            return key.hashCode();
+        }
+        
+        public boolean equals(Object o) {
+            return key.equals(o);
+        }
+        
+        /**
+         * returns the entire object as a byte array
+         * @return byte[]
+         * @throws IOException
+         */
+        public byte[] getData() throws IOException {
+            return (new ObjectStreamable(value)).getBuf().getArray();
+        }
+        
+        /**
+         * apply a diff, or an entire object
+         * @param data byte[]
+         * @param offset int
+         * @param length int
+         * @param diff boolean
+         * @throws IOException
+         * @throws ClassNotFoundException
+         */
+        public void apply(byte[] data, int offset, int length, boolean diff) 
throws IOException, ClassNotFoundException {
+            if ( isDiffable() && diff ) {
+                ((Diffable)value).applyDiff(data,offset,length);
+            } else if ( length == 0 ) {
+                value = null;
+                proxy = true;
+            } else {
+                value = XByteBuffer.deserialize(data,offset,length);
+            }
+        }
+        
+    }
+
+//------------------------------------------------------------------------------
    
+//                streamable class    
+//------------------------------------------------------------------------------
+    
+    public static class ObjectStreamable implements Streamable {
+        private DirectByteArrayOutputStream buf;
+        private int pos=0;
+        public ObjectStreamable(Serializable value) throws IOException {
+            buf = new DirectByteArrayOutputStream(1024);
+            ObjectOutputStream out = new ObjectOutputStream(buf);
+            out.writeObject(value);
+            out.flush();
+        }
+        
+        /**
+         * returns true if the stream has reached its end
+         * @return boolean
+         */
+        public synchronized boolean eof() {
+            return (pos>=buf.size());
+    
+        }
+    
+        /**
+         * write data into the byte array starting at offset, maximum bytes 
read are (data.length-offset)
+         * @param data byte[] - the array to read data into
+         * @param offset int - start position for writing data
+         * @return int - the number of bytes written into the data buffer
+         */
+        public synchronized int write(byte[] data, int offset) throws 
IOException {
+            int length = Math.min(data.length-offset,buf.size()-pos);
+            System.arraycopy(buf.getArrayDirect(),pos,data,offset,length);
+            pos = pos + length;
+            return length;
+        }
+        
+        public DirectByteArrayOutputStream getBuf() {
+            return buf;
+        }
+        
+        public int size() {
+            return buf.size();
+        }
+        
+        public int pos() {
+            return pos;
+        }
+
+    }
+
+}
\ No newline at end of file

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/RpcChannel.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/RpcChannel.java?rev=384333&r1=384332&r2=384333&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/RpcChannel.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/RpcChannel.java
 Wed Mar  8 13:23:10 2006
@@ -123,6 +123,14 @@
         }//end if
     }
     
+    public void breakDown() {
+        channel.removeChannelListener(this);
+    }
+    
+    public void finalize() {
+        breakDown();
+    }
+    
     public boolean accept(Serializable msg, Member sender) {
         if ( msg instanceof RpcMessage ) {
             RpcMessage rmsg = (RpcMessage)msg;

Added: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/Streamable.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/Streamable.java?rev=384333&view=auto
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/Streamable.java
 (added)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/Streamable.java
 Wed Mar  8 13:23:10 2006
@@ -0,0 +1,50 @@
+/*
+ * Copyright 1999,2004-2006 The Apache Software Foundation.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.catalina.tribes.tipis;
+
+import java.io.IOException;
+
+/**
+ * Example usage:
+ * <code><pre>
+ * byte[] data = new byte[1024];
+ * Streamable st = ....;
+ * while ( !st.eof() ) {
+ * &nbsp;&nbsp;int length = st.read(data,0);
+ * &nbsp;&nbsp;String s = new String(data,0,length);
+ * &nbsp;&nbsp;System.out.println(s);
+ * }
+ * </pre></code>
+ * @author Filip Hanik
+ * @version 1.0
+ */
+public interface Streamable {
+    
+    /**
+     * returns true if the stream has reached its end
+     * @return boolean
+     */
+    public boolean eof();
+   
+    /**
+     * write data into the byte array starting at offset, maximum bytes read 
are (data.length-offset)
+     * @param data byte[] - the array to read data into
+     * @param offset int - start position for writing data
+     * @return int - the number of bytes written into the data buffer
+     */
+    public int write(byte[] data, int offset) throws IOException;
+   
+}
\ No newline at end of file



---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]

Reply via email to