Author: fhanik
Date: Fri May 19 11:08:21 2006
New Revision: 407868

URL: http://svn.apache.org/viewvc?rev=407868&view=rev
Log:
Added in two phase commit interceptor, this one will work in such a way that it 
either the message gets delivered to all
or none. Of course, if the second message fails part through, then it wont work

Added:
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/TwoPhaseCommitInterceptor.java
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/util/Arrays.java
Modified:
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/UniqueId.java
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java
    tomcat/container/tc5.5.x/modules/groupcom/to-do.txt

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/UniqueId.java
URL: 
http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/UniqueId.java?rev=407868&r1=407867&r2=407868&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/UniqueId.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/UniqueId.java
 Fri May 19 11:08:21 2006
@@ -32,6 +32,12 @@
     }
 
     public UniqueId(byte[] id) {
+        this.id = id;
+    }
+    
+    public UniqueId(byte[] id, int offset, int length) {
+        this.id = new byte[length];
+        System.arraycopy(id,offset,this.id,0,length);
     }
     
     public int hashCode() {
@@ -49,6 +55,10 @@
             else result = Arrays.equals(this.id,uid.id);
         }//end if
         return result;
+    }
+    
+    public byte[] getBytes() {
+        return id;
     }
 
 }

Added: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/TwoPhaseCommitInterceptor.java
URL: 
http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/TwoPhaseCommitInterceptor.java?rev=407868&view=auto
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/TwoPhaseCommitInterceptor.java
 (added)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/TwoPhaseCommitInterceptor.java
 Fri May 19 11:08:21 2006
@@ -0,0 +1,122 @@
+/*
+ * Copyright 1999,2004-2006 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.catalina.tribes.group.interceptors;
+
+import java.util.HashMap;
+
+import org.apache.catalina.tribes.ChannelException;
+import org.apache.catalina.tribes.ChannelMessage;
+import org.apache.catalina.tribes.Member;
+import org.apache.catalina.tribes.group.ChannelInterceptorBase;
+import org.apache.catalina.tribes.group.InterceptorPayload;
+import org.apache.catalina.tribes.util.UUIDGenerator;
+import org.apache.catalina.tribes.util.Arrays;
+import org.apache.catalina.tribes.UniqueId;
+
+/**
+ * <p>Title: </p>
+ *
+ * <p>Description: </p>
+ *
+ * <p>Copyright: Copyright (c) 2005</p>
+ *
+ * <p>Company: </p>
+ *
+ * @author not attributable
+ * @version 1.0
+ */
+public class TwoPhaseCommitInterceptor extends ChannelInterceptorBase {
+
+    public static final byte[] START_DATA = new byte[] {113, 1, -58, 2, -34, 
-60, 75, -78, -101, -12, 32, -29, 32, 111, -40, 4};
+    public static final byte[] END_DATA = new byte[] {54, -13, 90, 110, 47, 
-31, 75, -24, -81, -29, 36, 52, -58, 77, -110, 56};
+    private static org.apache.commons.logging.Log log = 
org.apache.commons.logging.LogFactory.getLog(TwoPhaseCommitInterceptor.class);
+
+    protected HashMap messages = new HashMap();
+    protected long expire = 1000 * 60; //one minute expiration
+    protected boolean deepclone = true;
+
+    public void sendMessage(Member[] destination, ChannelMessage msg, 
InterceptorPayload payload) throws
+        ChannelException {
+        if (okToProcess(msg.getOptions())) {
+            super.sendMessage(destination, msg, null);
+            ChannelMessage confirmation = null;
+            if ( deepclone ) confirmation = (ChannelMessage)msg.deepclone();
+            else confirmation = (ChannelMessage)msg.clone();
+            confirmation.getMessage().reset();
+            UUIDGenerator.randomUUID(false,confirmation.getUniqueId(),0);
+            confirmation.getMessage().append(START_DATA,0,START_DATA.length);
+            
confirmation.getMessage().append(msg.getUniqueId(),0,msg.getUniqueId().length);
+            confirmation.getMessage().append(END_DATA,0,END_DATA.length);
+            super.sendMessage(destination,confirmation,payload);
+        } else {
+            super.sendMessage(destination, msg, payload);
+        }
+    }
+
+    public void messageReceived(ChannelMessage msg) {
+        if (okToProcess(msg.getOptions())) {
+            if ( msg.getMessage().getLength() == 
(START_DATA.length+msg.getUniqueId().length+END_DATA.length) &&
+                 
Arrays.contains(msg.getMessage().getBytesDirect(),0,START_DATA,0,START_DATA.length)
 &&
+                 
Arrays.contains(msg.getMessage().getBytesDirect(),START_DATA.length+msg.getUniqueId().length,END_DATA,0,END_DATA.length)
 ) {
+                UniqueId id = new 
UniqueId(msg.getMessage().getBytesDirect(),START_DATA.length,msg.getUniqueId().length);
+                MapEntry original = (MapEntry)messages.get(id);
+                if ( original != null ) {
+                    super.messageReceived(original.msg);
+                    messages.remove(id);
+                } else log.warn("Received a confirmation, but original message 
is missing. Id:"+Arrays.toString(id.getBytes()));
+            } else {
+                UniqueId id = new UniqueId(msg.getUniqueId());
+                MapEntry entry = new 
MapEntry((ChannelMessage)msg.deepclone(),id,System.currentTimeMillis());
+                messages.put(id,entry);
+            }
+        } else {
+            super.messageReceived(msg);
+        }
+    }
+
+    public boolean getDeepclone() {
+        return deepclone;
+    }
+
+    public long getExpire() {
+        return expire;
+    }
+
+    public void setDeepclone(boolean deepclone) {
+        this.deepclone = deepclone;
+    }
+
+    public void setExpire(long expire) {
+        this.expire = expire;
+    }
+    
+    public static class MapEntry {
+        public ChannelMessage msg;
+        public UniqueId id;
+        public long timestamp;
+        
+        public MapEntry(ChannelMessage msg, UniqueId id, long timestamp) {
+            this.msg = msg;
+            this.id = id;
+            this.timestamp = timestamp;
+        }
+        public boolean expired(long now, long expiration) {
+            return (now - timestamp ) > expiration;
+        }
+
+    }
+
+}
\ No newline at end of file

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java
URL: 
http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java?rev=407868&r1=407867&r2=407868&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java
 Fri May 19 11:08:21 2006
@@ -635,15 +635,27 @@
         private boolean proxy;
         private Member[] backupNodes;
 
-        private Serializable key;
-        private Serializable value;
+        private Object key;
+        private Object value;
 
-        public MapEntry(Serializable key, Serializable value) {
+        public MapEntry(Object key, Object value) {
             setKey(key);
             setValue(value);
             
         }
-
+        
+        public boolean isKeySerializable() {
+            return (key == null) || (key instanceof Serializable);
+        }
+        
+        public boolean isValueSerializable() {
+            return (value==null) || (value instanceof Serializable);
+        }
+        
+        public boolean isSerializable() {
+            return isKeySerializable() && isValueSerializable();
+        }
+        
         public boolean isBackup() {
             return backup;
         }

Added: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/util/Arrays.java
URL: 
http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/util/Arrays.java?rev=407868&view=auto
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/util/Arrays.java
 (added)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/util/Arrays.java
 Fri May 19 11:08:21 2006
@@ -0,0 +1,66 @@
+/*
+ * Copyright 1999,2004-2006 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.catalina.tribes.util;
+
+import org.apache.catalina.tribes.UniqueId;
+import org.apache.catalina.tribes.ChannelMessage;
+
+/**
+ * @author Filip Hanik
+ * @version 1.0
+ */
+public class Arrays {
+    
+    public static boolean contains(byte[] source, int srcoffset, byte[] key, 
int keyoffset, int length) {
+        if ( srcoffset < 0 || srcoffset >= source.length) throw new 
ArrayIndexOutOfBoundsException("srcoffset is out of bounds.");
+        if ( keyoffset < 0 || keyoffset >= key.length) throw new 
ArrayIndexOutOfBoundsException("keyoffset is out of bounds.");
+        if ( length >= (key.length-keyoffset) ) throw new 
ArrayIndexOutOfBoundsException("not enough data elements in the key, length is 
out of bounds.");
+        //we don't have enough data to validate it
+        if ( length >= (source.length-srcoffset) ) return false;
+        boolean match = true;
+        int pos = keyoffset;
+        for ( int i=srcoffset; match && i<length; i++ ) {
+            match = (source[i] == key[pos++]);
+        }
+        return match;
+    }
+    
+    public static String toString(byte[] data) {
+        return toString(data,0,data.length);
+    }
+
+    public static String toString(byte[] data, int offset, int length) {
+        StringBuffer buf = new StringBuffer("{");
+        buf.append(data[offset++]);
+        length--;
+        for ( int i=offset; i<length; i++ ) {
+            buf.append(", ").append(data[i]);
+        }
+        buf.append("}");
+        return buf.toString();
+    }
+    
+    public static UniqueId getUniqudId(ChannelMessage msg) {
+        return new UniqueId(msg.getUniqueId());
+    }
+
+    public static UniqueId getUniqudId(byte[] data) {
+        return new UniqueId(data);
+    }
+    
+    
+    
+}
\ No newline at end of file

Modified: tomcat/container/tc5.5.x/modules/groupcom/to-do.txt
URL: 
http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/to-do.txt?rev=407868&r1=407867&r2=407868&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/to-do.txt (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/to-do.txt Fri May 19 11:08:21 2006
@@ -42,6 +42,10 @@
 Code Tasks:
 ===========================================
 
+48. Periodic refresh of the replicated map (primary ->backup)
+
+47. Delta(session) versioning. increase version number each time, easier to 
keep maps in sync
+
 41. Build a tipi that is a soft membership
 
 38. Make the AbstractReplicatedMap accept non serializable elements, but just 
don't replicate them
@@ -56,8 +60,6 @@
 34. Configurable payload for the membership heartbeat, so that the app can 
decide what to heartbeat.
     such as JMX management port, ala Andy Piper's suggestion.
 
-33. PerfectFDInterceptor, when a member is reported missing, first check TCP 
path too.
-
 32. Replicated JNDI entries in Tomcat in the format
     cluster:<map name>/<entry key> for example
     cluster:myapps/db/shared/dbinfo
@@ -287,4 +289,7 @@
 
 45. McastServiceImpl.receive should have a SO_TIMEOUT so that we can check
     for members dropping on the same thread
+Notes: Completed
+
+33. TcpFailureDetector, when a member is reported missing, first check TCP 
path too.
 Notes: Completed



---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]

Reply via email to