Author: fhanik Date: Tue Feb 8 20:18:19 2011 New Revision: 1068549 URL: http://svn.apache.org/viewvc?rev=1068549&view=rev Log: https://issues.apache.org/bugzilla/show_bug.cgi?id=50667 Allow a replier to get confirmation if the reply message was sent successfully or if it failed
Added: tomcat/trunk/java/org/apache/catalina/tribes/group/ExtendedRpcCallback.java Modified: tomcat/trunk/java/org/apache/catalina/tribes/group/RpcChannel.java Added: tomcat/trunk/java/org/apache/catalina/tribes/group/ExtendedRpcCallback.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/tribes/group/ExtendedRpcCallback.java?rev=1068549&view=auto ============================================================================== --- tomcat/trunk/java/org/apache/catalina/tribes/group/ExtendedRpcCallback.java (added) +++ tomcat/trunk/java/org/apache/catalina/tribes/group/ExtendedRpcCallback.java Tue Feb 8 20:18:19 2011 @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.catalina.tribes.group; + +import java.io.Serializable; + +import org.apache.catalina.tribes.ErrorHandler; +import org.apache.catalina.tribes.Member; +/** + * Extension to the {@link RpcCallback} interface. Allows a RPC messenger to get a confirmation if the reply + * was sent successfully to the original sender. + * @author fhanik + * + */ +public interface ExtendedRpcCallback extends RpcCallback { + + /** + * + * @param request - the original message that requested the reply + * @param response - the reply message to the original message + * @param sender - the sender requested that reply + * @param reason - the reason the reply failed + * @return true if the callback would like to reattempt the reply, false otherwise + */ + public boolean replyFailed(Serializable request, Serializable response, Member sender, Exception reason); + + /** + * + * @param request - the original message that requested the reply + * @param response - the reply message to the original message + * @param sender - the sender requested that reply + */ + public void replySucceeded(Serializable request, Serializable response, Member sender); +} Modified: tomcat/trunk/java/org/apache/catalina/tribes/group/RpcChannel.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/tribes/group/RpcChannel.java?rev=1068549&r1=1068548&r2=1068549&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/catalina/tribes/group/RpcChannel.java (original) +++ tomcat/trunk/java/org/apache/catalina/tribes/group/RpcChannel.java Tue Feb 8 20:18:19 2011 @@ -24,7 +24,9 @@ import java.util.HashMap; import org.apache.catalina.tribes.Channel; import org.apache.catalina.tribes.ChannelException; import org.apache.catalina.tribes.ChannelListener; +import org.apache.catalina.tribes.ErrorHandler; import org.apache.catalina.tribes.Member; +import org.apache.catalina.tribes.UniqueId; import org.apache.catalina.tribes.util.UUIDGenerator; import org.apache.juli.logging.Log; import org.apache.juli.logging.LogFactory; @@ -126,14 +128,46 @@ public class RpcChannel implements Chann }//synchronized }//end if } else{ + boolean finished = false; + final ExtendedRpcCallback excallback = (callback instanceof ExtendedRpcCallback)?((ExtendedRpcCallback)callback) : null; + boolean asyncReply = ((replyMessageOptions & Channel.SEND_OPTIONS_ASYNCHRONOUS) == Channel.SEND_OPTIONS_ASYNCHRONOUS); Serializable reply = callback.replyRequest(rmsg.message,sender); - rmsg.reply = true; - rmsg.message = reply; - try { - channel.send(new Member[] {sender}, rmsg, - replyMessageOptions & ~Channel.SEND_OPTIONS_SYNCHRONIZED_ACK); - }catch ( Exception x ) { - log.error("Unable to send back reply in RpcChannel.",x); + while (!finished) { + ErrorHandler handler = null; + final Serializable request = msg; + final Serializable response = reply; + final Member fsender = sender; + if (excallback!=null && asyncReply) { + handler = new ErrorHandler() { + public void handleError(ChannelException x, UniqueId id) { + excallback.replyFailed(request, response, fsender, x); + } + + public void handleCompletion(UniqueId id) { + excallback.replySucceeded(request, response, fsender); + } + }; + } + rmsg.reply = true; + rmsg.message = reply; + try { + if (handler!=null) { + channel.send(new Member[] {sender}, rmsg,replyMessageOptions & ~Channel.SEND_OPTIONS_SYNCHRONIZED_ACK, handler); + } else { + channel.send(new Member[] {sender}, rmsg,replyMessageOptions & ~Channel.SEND_OPTIONS_SYNCHRONIZED_ACK); + } + finished = true; + if (excallback != null && !asyncReply) { + excallback.replySucceeded(rmsg.message, reply, sender); + } + }catch ( Exception x ) { + if (excallback != null && !asyncReply) { + finished = !excallback.replyFailed(rmsg.message, reply, sender, x); + } else { + finished = true; + log.error("Unable to send back reply in RpcChannel.",x); + } + } } }//end if } --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org For additional commands, e-mail: dev-h...@tomcat.apache.org