Author: fhanik
Date: Fri Jan 9 15:21:08 2009
New Revision: 733187
URL: http://svn.apache.org/viewvc?rev=733187&view=rev
Log:
defer the deserialization of the message to an async thread to be able to
handle more incoming, still I can send more than I can receive
Modified:
tomcat/trunk/java/org/apache/catalina/tribes/membership/McastService.java
tomcat/trunk/java/org/apache/catalina/tribes/membership/McastServiceImpl.java
Modified:
tomcat/trunk/java/org/apache/catalina/tribes/membership/McastService.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/tribes/membership/McastService.java?rev=733187&r1=733186&r2=733187&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/catalina/tribes/membership/McastService.java
(original)
+++ tomcat/trunk/java/org/apache/catalina/tribes/membership/McastService.java
Fri Jan 9 15:21:08 2009
@@ -503,7 +503,6 @@
public boolean accept(ChannelMessage msg) {
return true;
}
-
public void broadcast(ChannelMessage message) throws ChannelException {
if (impl==null || (impl.startLevel &
Channel.MBR_TX_SEQ)!=Channel.MBR_TX_SEQ )
throw new ChannelException("Multicast send is not started or
enabled.");
Modified:
tomcat/trunk/java/org/apache/catalina/tribes/membership/McastServiceImpl.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/tribes/membership/McastServiceImpl.java?rev=733187&r1=733186&r2=733187&view=diff
==============================================================================
---
tomcat/trunk/java/org/apache/catalina/tribes/membership/McastServiceImpl.java
(original)
+++
tomcat/trunk/java/org/apache/catalina/tribes/membership/McastServiceImpl.java
Fri Jan 9 15:21:08 2009
@@ -346,19 +346,7 @@
if
(XByteBuffer.firstIndexOf(data,0,MemberImpl.TRIBES_MBR_BEGIN)==0) {
memberDataReceived(data);
} else {
- XByteBuffer buffer = new XByteBuffer(data,true);
- if (buffer.countPackages(true)>0) {
- int count = buffer.countPackages();
- ChannelData[] pkgs = new ChannelData[count];
- for (int i=0; i<count; i++) {
- try {
- pkgs[i] = buffer.extractPackage(true);
- }catch (IllegalStateException ise) {
- log.debug("Unable to decode message.",ise);
- }
- }
- memberBroadcastsReceived(pkgs);
- }
+ memberBroadcastsReceived(data);
}
}
@@ -407,28 +395,42 @@
}
}
- private void memberBroadcastsReceived(final ChannelData[] data) {
+ private void memberBroadcastsReceived(final byte[] b) {
if (log.isTraceEnabled()) log.trace("Mcast received broadcasts.");
- Runnable t = new Runnable() {
- public void run() {
- String name = Thread.currentThread().getName();
+ XByteBuffer buffer = new XByteBuffer(b,true);
+ if (buffer.countPackages(true)>0) {
+ int count = buffer.countPackages();
+ final ChannelData[] data = new ChannelData[count];
+ for (int i=0; i<count; i++) {
try {
- Thread.currentThread().setName("Membership-MemberAdded.");
- for (int i=0; i<data.length; i++ ) {
- try {
- if (data[i]!=null) {
- msgservice.messageReceived(data[i]);
+ data[i] = buffer.extractPackage(true);
+ }catch (IllegalStateException ise) {
+ log.debug("Unable to decode message.",ise);
+ }catch (IOException x) {
+ log.debug("Unable to decode message.",x);
+ }
+ }
+ Runnable t = new Runnable() {
+ public void run() {
+ String name = Thread.currentThread().getName();
+ try {
+
Thread.currentThread().setName("Membership-MemberAdded.");
+ for (int i=0; i<data.length; i++ ) {
+ try {
+ if (data[i]!=null &&
!member.equals(data[i].getAddress())) {
+ msgservice.messageReceived(data[i]);
+ }
+ }catch (Throwable t) {
+ log.error("Unable to receive broadcast
message.",t);
}
- }catch (Throwable t) {
- log.error("Unable to receive broadcast
message.",t);
}
+ }finally {
+ Thread.currentThread().setName(name);
}
- }finally {
- Thread.currentThread().setName(name);
}
- }
- };
- executor.execute(t);
+ };
+ executor.execute(t);
+ }
}
protected final Object expiredMutex = new Object();
@@ -469,6 +471,7 @@
}
private final Object sendLock = new Object();
+
public void send(boolean checkexpired, DatagramPacket packet) throws
IOException{
checkexpired = (checkexpired && (packet==null));
//ignore if we haven't started the sender
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]