Author: pero Date: Wed Oct 18 09:37:42 2006 New Revision: 465293 URL: http://svn.apache.org/viewvc?view=rev&rev=465293 Log: Made recovery more robust.
Modified: tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/FastAsyncSocketSender.java tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/LocalStrings.properties tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/mbeans-descriptors.xml tomcat/container/tc5.5.x/webapps/docs/changelog.xml Modified: tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/FastAsyncSocketSender.java URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/FastAsyncSocketSender.java?view=diff&rev=465293&r1=465292&r2=465293 ============================================================================== --- tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/FastAsyncSocketSender.java (original) +++ tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/FastAsyncSocketSender.java Wed Oct 18 09:37:42 2006 @@ -54,7 +54,7 @@ /** * The descriptive information about this implementation. */ - private static final String info = "FastAsyncSocketSender/3.0"; + private static final String info = "FastAsyncSocketSender/3.1"; // ----------------------------------------------------- Instance Variables @@ -69,6 +69,16 @@ private FastQueueThread queueThread = null; /** + * recover timeout ( default 5 secs) + */ + private long recoverTimeout = 5000; + + /** + * number of recover tries + */ + private int recoverCounter = 5; + + /** * Count number of queue message */ private long inQueueCounter = 0; @@ -229,6 +239,40 @@ } /** + * get current push message recover timeout + * @return current push message recover timeout + */ + public long getRecoverTimeout() { + + return recoverTimeout; + } + + /** + * Set recover timeout (default 5000 msec) + * @param timeout + */ + public void setRecoverTimeout(long timeout) { + recoverTimeout = timeout; + } + + /** + * get current push message recover counter + * @return current push message recover counter + */ + public int getRecoverCounter() { + + return recoverCounter; + } + + /** + * Set recover couner (default 5 ) + * @param counter + */ + public void setRecoverCounter(int counter) { + recoverCounter = counter; + } + + /** * change active the queue Thread priority * @param threadPriority value must be between MIN and MAX Thread Priority * @exception IllegalArgumentException @@ -465,25 +509,62 @@ } /** - * @param entry + * Push all messages from queue to other nodes. Is revovery configured + * make a resends with some waits. + * @param entry list of messages */ protected void pushQueuedMessages(LinkObject entry) { do { int messagesize = 0; + ClusterData data = null ; try { - ClusterData data = (ClusterData) entry.data(); + data = (ClusterData) entry.data(); messagesize = data.getMessage().length; sender.pushMessage(data); } catch (Exception x) { - log.warn(sm.getString( - "AsyncSocketSender.send.error", entry + long rTimeout = sender.getRecoverTimeout() ; + int rCounter = sender.getRecoverCounter() ; + if(data != null && + rTimeout > 0 && + rCounter > 0) { + // wait that network get stabler + int counter = 1; + boolean success = false ; + do { + try { + Thread.sleep(rTimeout*counter); + } catch (Exception sleep) { + } + try { + if(log.isDebugEnabled()) { + log.debug(sm.getString("AsyncSocketSender.send.recover", + entry.getKey(), + new Integer(counter), + new Integer(rCounter), new Long(rTimeout))) ; + } + sender.pushMessage(data); + success = true; + } catch (Exception xx) { + counter++; + } + } while (keepRunning && !success && counter <= rCounter); + + if(!success) { + log.warn(sm.getString( + "AsyncSocketSender.send.error", entry .getKey()), x); - } finally { + } + } else { + log.warn(sm.getString( + "AsyncSocketSender.send.error", entry + .getKey()), x); + } + } finally { outQueueCounter++; decQueuedNrOfBytes(messagesize); } entry = entry.next(); - } while (entry != null); + } while (keepRunning && entry != null); } } Modified: tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/LocalStrings.properties URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/LocalStrings.properties?view=diff&rev=465293&r1=465292&r2=465293 ============================================================================== --- tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/LocalStrings.properties (original) +++ tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/LocalStrings.properties Wed Oct 18 09:37:42 2006 @@ -2,6 +2,7 @@ AsyncSocketSender.queue.message=Queue message to [{0}:{1,number,integer}] id=[{2}] size={3} AsyncSocketSender.send.error=Unable to asynchronously send session with id=[{0}] - message will be ignored. AsyncSocketSender.queue.empty=Queue in sender [{0}:{1,number,integer}] returned null element! +AsyncSocketSender.send.recover=Recover queued message id=[{0}] after failure and send again ( current counter={1,number,integer}, max counter={2,number,integer}, timeout={3,number,long}) cluster.mbean.register.already=MBean {0} already registered! FastAsyncSocketSender.setThreadPriority=[{0}:{1,number,integer}] set priority to {2} FastAsyncSocketSender.min.exception=[{0}:{1,number,integer}] new priority {2} < MIN_PRIORITY Modified: tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/mbeans-descriptors.xml URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/mbeans-descriptors.xml?view=diff&rev=465293&r1=465292&r2=465293 ============================================================================== --- tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/mbeans-descriptors.xml (original) +++ tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/mbeans-descriptors.xml Wed Oct 18 09:37:42 2006 @@ -620,6 +620,12 @@ description="after send failure make a resend" is="true" type="boolean" /> + <attribute name="recoverTimeout" + description="recover Timeout after push message failure (default 5000 msec)" + type="long" /> + <attribute name="recoverCounter" + description="number of recover tries (default 5)" + type="int" /> <attribute name="connected" is="true" description="socket connected" Modified: tomcat/container/tc5.5.x/webapps/docs/changelog.xml URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/webapps/docs/changelog.xml?view=diff&rev=465293&r1=465292&r2=465293 ============================================================================== --- tomcat/container/tc5.5.x/webapps/docs/changelog.xml (original) +++ tomcat/container/tc5.5.x/webapps/docs/changelog.xml Wed Oct 18 09:37:42 2006 @@ -84,6 +84,14 @@ </fix> </changelog> </subsection> + <subsection name="Cluster"> + <changelog> + <add> + Add better recovery at FastAsyncQueueSender. Made the startegy more robust for temporary connection problems (pero) + </add> + </changelog> + </subsection> + </section> --------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]