This is an automated email from the ASF dual-hosted git repository. zregvart pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
commit 78ea9bb3d2155e6391f238e9da6da8bb826009c1 Author: Zoran Regvart <zregv...@apache.org> AuthorDate: Wed Dec 16 14:41:17 2020 +0100 CAMEL-12871: release resources on stop When SubscriptionHelper is stopped we need to remove all listeners and close channels this helper is listening on. --- .../internal/streaming/SubscriptionHelper.java | 29 +++++++++++++++++++--- 1 file changed, 26 insertions(+), 3 deletions(-) diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java index 625b2f2..25e363b 100644 --- a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java +++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java @@ -36,6 +36,7 @@ import org.apache.camel.component.salesforce.internal.SalesforceSession; import org.apache.camel.support.service.ServiceSupport; import org.cometd.bayeux.Message; import org.cometd.bayeux.client.ClientSessionChannel; +import org.cometd.bayeux.client.ClientSessionChannel.MessageListener; import org.cometd.client.BayeuxClient; import org.cometd.client.BayeuxClient.State; import org.cometd.client.transport.ClientTransport; @@ -320,11 +321,33 @@ public class SubscriptionHelper extends ServiceSupport { return exception; } + private void closeChannel(final String name, MessageListener listener) { + if (client == null) { + return; + } + + final ClientSessionChannel channel = client.getChannel(name); + channel.removeListener(listener); + channel.release(); + } + @Override protected void doStop() throws Exception { - client.getChannel(META_DISCONNECT).removeListener(disconnectListener); - client.getChannel(META_CONNECT).removeListener(connectListener); - client.getChannel(META_HANDSHAKE).removeListener(handshakeListener); + closeChannel(META_DISCONNECT, disconnectListener); + closeChannel(META_CONNECT, connectListener); + closeChannel(META_HANDSHAKE, handshakeListener); + + for (Map.Entry<SalesforceConsumer, MessageListener> entry : listenerMap.entrySet()) { + final SalesforceConsumer consumer = entry.getKey(); + final String topic = consumer.getTopicName(); + + final MessageListener listener = entry.getValue(); + closeChannel(getChannelName(topic), listener); + } + + if (client == null) { + return; + } client.disconnect(); boolean disconnected = client.waitFor(timeout, State.DISCONNECTED);