Author: fhanik Date: Fri May 19 11:08:21 2006 New Revision: 407868 URL: http://svn.apache.org/viewvc?rev=407868&view=rev Log: Added in two phase commit interceptor, this one will work in such a way that it either the message gets delivered to all or none. Of course, if the second message fails part through, then it wont work
Added: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/TwoPhaseCommitInterceptor.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/util/Arrays.java Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/UniqueId.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java tomcat/container/tc5.5.x/modules/groupcom/to-do.txt Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/UniqueId.java URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/UniqueId.java?rev=407868&r1=407867&r2=407868&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/UniqueId.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/UniqueId.java Fri May 19 11:08:21 2006 @@ -32,6 +32,12 @@ } public UniqueId(byte[] id) { + this.id = id; + } + + public UniqueId(byte[] id, int offset, int length) { + this.id = new byte[length]; + System.arraycopy(id,offset,this.id,0,length); } public int hashCode() { @@ -49,6 +55,10 @@ else result = Arrays.equals(this.id,uid.id); }//end if return result; + } + + public byte[] getBytes() { + return id; } } Added: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/TwoPhaseCommitInterceptor.java URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/TwoPhaseCommitInterceptor.java?rev=407868&view=auto ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/TwoPhaseCommitInterceptor.java (added) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/TwoPhaseCommitInterceptor.java Fri May 19 11:08:21 2006 @@ -0,0 +1,122 @@ +/* + * Copyright 1999,2004-2006 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.catalina.tribes.group.interceptors; + +import java.util.HashMap; + +import org.apache.catalina.tribes.ChannelException; +import org.apache.catalina.tribes.ChannelMessage; +import org.apache.catalina.tribes.Member; +import org.apache.catalina.tribes.group.ChannelInterceptorBase; +import org.apache.catalina.tribes.group.InterceptorPayload; +import org.apache.catalina.tribes.util.UUIDGenerator; +import org.apache.catalina.tribes.util.Arrays; +import org.apache.catalina.tribes.UniqueId; + +/** + * <p>Title: </p> + * + * <p>Description: </p> + * + * <p>Copyright: Copyright (c) 2005</p> + * + * <p>Company: </p> + * + * @author not attributable + * @version 1.0 + */ +public class TwoPhaseCommitInterceptor extends ChannelInterceptorBase { + + public static final byte[] START_DATA = new byte[] {113, 1, -58, 2, -34, -60, 75, -78, -101, -12, 32, -29, 32, 111, -40, 4}; + public static final byte[] END_DATA = new byte[] {54, -13, 90, 110, 47, -31, 75, -24, -81, -29, 36, 52, -58, 77, -110, 56}; + private static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory.getLog(TwoPhaseCommitInterceptor.class); + + protected HashMap messages = new HashMap(); + protected long expire = 1000 * 60; //one minute expiration + protected boolean deepclone = true; + + public void sendMessage(Member[] destination, ChannelMessage msg, InterceptorPayload payload) throws + ChannelException { + if (okToProcess(msg.getOptions())) { + super.sendMessage(destination, msg, null); + ChannelMessage confirmation = null; + if ( deepclone ) confirmation = (ChannelMessage)msg.deepclone(); + else confirmation = (ChannelMessage)msg.clone(); + confirmation.getMessage().reset(); + UUIDGenerator.randomUUID(false,confirmation.getUniqueId(),0); + confirmation.getMessage().append(START_DATA,0,START_DATA.length); + confirmation.getMessage().append(msg.getUniqueId(),0,msg.getUniqueId().length); + confirmation.getMessage().append(END_DATA,0,END_DATA.length); + super.sendMessage(destination,confirmation,payload); + } else { + super.sendMessage(destination, msg, payload); + } + } + + public void messageReceived(ChannelMessage msg) { + if (okToProcess(msg.getOptions())) { + if ( msg.getMessage().getLength() == (START_DATA.length+msg.getUniqueId().length+END_DATA.length) && + Arrays.contains(msg.getMessage().getBytesDirect(),0,START_DATA,0,START_DATA.length) && + Arrays.contains(msg.getMessage().getBytesDirect(),START_DATA.length+msg.getUniqueId().length,END_DATA,0,END_DATA.length) ) { + UniqueId id = new UniqueId(msg.getMessage().getBytesDirect(),START_DATA.length,msg.getUniqueId().length); + MapEntry original = (MapEntry)messages.get(id); + if ( original != null ) { + super.messageReceived(original.msg); + messages.remove(id); + } else log.warn("Received a confirmation, but original message is missing. Id:"+Arrays.toString(id.getBytes())); + } else { + UniqueId id = new UniqueId(msg.getUniqueId()); + MapEntry entry = new MapEntry((ChannelMessage)msg.deepclone(),id,System.currentTimeMillis()); + messages.put(id,entry); + } + } else { + super.messageReceived(msg); + } + } + + public boolean getDeepclone() { + return deepclone; + } + + public long getExpire() { + return expire; + } + + public void setDeepclone(boolean deepclone) { + this.deepclone = deepclone; + } + + public void setExpire(long expire) { + this.expire = expire; + } + + public static class MapEntry { + public ChannelMessage msg; + public UniqueId id; + public long timestamp; + + public MapEntry(ChannelMessage msg, UniqueId id, long timestamp) { + this.msg = msg; + this.id = id; + this.timestamp = timestamp; + } + public boolean expired(long now, long expiration) { + return (now - timestamp ) > expiration; + } + + } + +} \ No newline at end of file Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java?rev=407868&r1=407867&r2=407868&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java Fri May 19 11:08:21 2006 @@ -635,15 +635,27 @@ private boolean proxy; private Member[] backupNodes; - private Serializable key; - private Serializable value; + private Object key; + private Object value; - public MapEntry(Serializable key, Serializable value) { + public MapEntry(Object key, Object value) { setKey(key); setValue(value); } - + + public boolean isKeySerializable() { + return (key == null) || (key instanceof Serializable); + } + + public boolean isValueSerializable() { + return (value==null) || (value instanceof Serializable); + } + + public boolean isSerializable() { + return isKeySerializable() && isValueSerializable(); + } + public boolean isBackup() { return backup; } Added: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/util/Arrays.java URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/util/Arrays.java?rev=407868&view=auto ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/util/Arrays.java (added) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/util/Arrays.java Fri May 19 11:08:21 2006 @@ -0,0 +1,66 @@ +/* + * Copyright 1999,2004-2006 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.catalina.tribes.util; + +import org.apache.catalina.tribes.UniqueId; +import org.apache.catalina.tribes.ChannelMessage; + +/** + * @author Filip Hanik + * @version 1.0 + */ +public class Arrays { + + public static boolean contains(byte[] source, int srcoffset, byte[] key, int keyoffset, int length) { + if ( srcoffset < 0 || srcoffset >= source.length) throw new ArrayIndexOutOfBoundsException("srcoffset is out of bounds."); + if ( keyoffset < 0 || keyoffset >= key.length) throw new ArrayIndexOutOfBoundsException("keyoffset is out of bounds."); + if ( length >= (key.length-keyoffset) ) throw new ArrayIndexOutOfBoundsException("not enough data elements in the key, length is out of bounds."); + //we don't have enough data to validate it + if ( length >= (source.length-srcoffset) ) return false; + boolean match = true; + int pos = keyoffset; + for ( int i=srcoffset; match && i<length; i++ ) { + match = (source[i] == key[pos++]); + } + return match; + } + + public static String toString(byte[] data) { + return toString(data,0,data.length); + } + + public static String toString(byte[] data, int offset, int length) { + StringBuffer buf = new StringBuffer("{"); + buf.append(data[offset++]); + length--; + for ( int i=offset; i<length; i++ ) { + buf.append(", ").append(data[i]); + } + buf.append("}"); + return buf.toString(); + } + + public static UniqueId getUniqudId(ChannelMessage msg) { + return new UniqueId(msg.getUniqueId()); + } + + public static UniqueId getUniqudId(byte[] data) { + return new UniqueId(data); + } + + + +} \ No newline at end of file Modified: tomcat/container/tc5.5.x/modules/groupcom/to-do.txt URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/to-do.txt?rev=407868&r1=407867&r2=407868&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/to-do.txt (original) +++ tomcat/container/tc5.5.x/modules/groupcom/to-do.txt Fri May 19 11:08:21 2006 @@ -42,6 +42,10 @@ Code Tasks: =========================================== +48. Periodic refresh of the replicated map (primary ->backup) + +47. Delta(session) versioning. increase version number each time, easier to keep maps in sync + 41. Build a tipi that is a soft membership 38. Make the AbstractReplicatedMap accept non serializable elements, but just don't replicate them @@ -56,8 +60,6 @@ 34. Configurable payload for the membership heartbeat, so that the app can decide what to heartbeat. such as JMX management port, ala Andy Piper's suggestion. -33. PerfectFDInterceptor, when a member is reported missing, first check TCP path too. - 32. Replicated JNDI entries in Tomcat in the format cluster:<map name>/<entry key> for example cluster:myapps/db/shared/dbinfo @@ -287,4 +289,7 @@ 45. McastServiceImpl.receive should have a SO_TIMEOUT so that we can check for members dropping on the same thread +Notes: Completed + +33. TcpFailureDetector, when a member is reported missing, first check TCP path too. Notes: Completed --------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]