Author: veithen
Date: Sat Jul 3 11:16:06 2010
New Revision: 960200
URL: http://svn.apache.org/viewvc?rev=960200&view=rev
Log:
AXIS2-4759: Applied patch submitted by Grant Patterson. Fixed
ServiceTaskManager to properly closes the connection. Also fixed some
concurrency issues.
Modified:
axis/axis2/java/transports/trunk/modules/jms/src/main/java/org/apache/axis2/transport/jms/ServiceTaskManager.java
Modified:
axis/axis2/java/transports/trunk/modules/jms/src/main/java/org/apache/axis2/transport/jms/ServiceTaskManager.java
URL:
http://svn.apache.org/viewvc/axis/axis2/java/transports/trunk/modules/jms/src/main/java/org/apache/axis2/transport/jms/ServiceTaskManager.java?rev=960200&r1=960199&r2=960200&view=diff
==============================================================================
---
axis/axis2/java/transports/trunk/modules/jms/src/main/java/org/apache/axis2/transport/jms/ServiceTaskManager.java
(original)
+++
axis/axis2/java/transports/trunk/modules/jms/src/main/java/org/apache/axis2/transport/jms/ServiceTaskManager.java
Sat Jul 3 11:16:06 2010
@@ -229,9 +229,9 @@ public class ServiceTaskManager {
if (sharedConnection != null) {
try {
- sharedConnection.stop();
+ sharedConnection.close();
} catch (JMSException e) {
- logError("Error stopping shared Connection", e);
+ logError("Error closing shared Connection", e);
} finally {
sharedConnection = null;
}
@@ -337,7 +337,7 @@ public class ServiceTaskManager {
/** Is this task idle right now? */
private volatile boolean idle = false;
/** Is this task connected to the JMS provider successfully? */
- private boolean connected = false;
+ private volatile boolean connected = false;
/** As soon as we create a new polling task, add it to the STM for
control later */
MessageListenerTask() {
@@ -439,31 +439,36 @@ public class ServiceTaskManager {
}
} finally {
+
+ if (log.isTraceEnabled()) {
+ log.trace("Listener task with Thread ID : " +
Thread.currentThread().getId() +
+ " is stopping after processing : " + messageCount + "
messages :: " +
+ " isActive : " + isActive() + " maxMessagesPerTask : "
+
+ getMaxMessagesPerTask() + " concurrentConsumers : " +
getConcurrentConsumers() +
+ " idleExecutionCount : " + idleExecutionCount + "
idleTaskExecutionLimit : " +
+ getIdleTaskExecutionLimit());
+ } else if (log.isDebugEnabled()) {
+ log.debug("Listener task with Thread ID : " +
Thread.currentThread().getId() +
+ " is stopping after processing : " + messageCount + "
messages");
+ }
+
+ // Close the consumer and session before decrementing
activeTaskCount.
+ // (If we have a shared connection, Qpid deadlocks if the
shared connection
+ // is closed on another thread while closing the session)
+ closeConsumer(true);
+ closeSession(true);
+ closeConnection();
+
workerState = STATE_STOPPED;
activeTaskCount--;
synchronized(pollingTasks) {
pollingTasks.remove(this);
}
+
+ // My time is up, so if I am going away, create another
+ scheduleNewTaskIfAppropriate();
}
- if (log.isTraceEnabled()) {
- log.trace("Listener task with Thread ID : " +
Thread.currentThread().getId() +
- " is stopping after processing : " + messageCount + "
messages :: " +
- " isActive : " + isActive() + " maxMessagesPerTask : " +
- getMaxMessagesPerTask() + " concurrentConsumers : " +
getConcurrentConsumers() +
- " idleExecutionCount : " + idleExecutionCount + "
idleTaskExecutionLimit : " +
- getIdleTaskExecutionLimit());
- } else if (log.isDebugEnabled()) {
- log.debug("Listener task with Thread ID : " +
Thread.currentThread().getId() +
- " is stopping after processing : " + messageCount + "
messages");
- }
-
- closeConsumer(true);
- closeSession(true);
- closeConnection();
-
- // My time is up, so if I am going away, create another
- scheduleNewTaskIfAppropriate();
}
/**
@@ -670,20 +675,23 @@ public class ServiceTaskManager {
// Connection is not shared
if (connection == null) {
connection = createConnection();
+ setConnected(true);
}
- } else {
- if (sharedConnection != null) {
- connection = sharedConnection;
- } else {
- synchronized(this) {
- if (sharedConnection == null) {
- sharedConnection = createConnection();
- }
- connection = sharedConnection;
- }
- }
+
+ } else if (connection == null) {
+ // Connection is shared, but may not have been created
+
+ synchronized(ServiceTaskManager.this) {
+ if (sharedConnection == null) {
+ sharedConnection = createConnection();
+ }
+ }
+ connection = sharedConnection;
+ setConnected(true);
+
}
- setConnected(true);
+ // else: Connection is shared and is already referenced by
this.connection
+
return connection;
}