This is an automated email from the ASF dual-hosted git repository. zregvart pushed a commit to branch issue/CAMEL-12871 in repository https://gitbox.apache.org/repos/asf/camel.git
commit a5d16c02c9c77a65945258a4b5576262323cfd8b Author: Zoran Regvart <zregv...@apache.org> AuthorDate: Mon Dec 14 13:53:50 2020 +0100 CAMEL-12871: release resources on stop (WIP) When SubscriptionHelper is stopped we need to remove all listeners and close channels this helper is listening on. --- .../internal/streaming/SubscriptionHelper.java | 21 ++++++++++++++++++--- 1 file changed, 18 insertions(+), 3 deletions(-) diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java index 625b2f2..e8c3360 100644 --- a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java +++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java @@ -36,6 +36,7 @@ import org.apache.camel.component.salesforce.internal.SalesforceSession; import org.apache.camel.support.service.ServiceSupport; import org.cometd.bayeux.Message; import org.cometd.bayeux.client.ClientSessionChannel; +import org.cometd.bayeux.client.ClientSessionChannel.MessageListener; import org.cometd.client.BayeuxClient; import org.cometd.client.BayeuxClient.State; import org.cometd.client.transport.ClientTransport; @@ -320,11 +321,25 @@ public class SubscriptionHelper extends ServiceSupport { return exception; } + private void closeChannel(final String name, MessageListener listener) { + final ClientSessionChannel channel = client.getChannel(name); + channel.removeListener(listener); + channel.release(); + } + @Override protected void doStop() throws Exception { - client.getChannel(META_DISCONNECT).removeListener(disconnectListener); - client.getChannel(META_CONNECT).removeListener(connectListener); - client.getChannel(META_HANDSHAKE).removeListener(handshakeListener); + closeChannel(META_DISCONNECT, disconnectListener); + closeChannel(META_CONNECT, connectListener); + closeChannel(META_HANDSHAKE, handshakeListener); + + for (Map.Entry<SalesforceConsumer, MessageListener> entry : listenerMap.entrySet()) { + final SalesforceConsumer consumer = entry.getKey(); + final String topic = consumer.getTopicName(); + + final MessageListener listener = entry.getValue(); + closeChannel(getChannelName(topic), listener); + } client.disconnect(); boolean disconnected = client.waitFor(timeout, State.DISCONNECTED);