CAMEL-6396: camel-solr should shutdown solr servers when no longer in use.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/66328293 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/66328293 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/66328293 Branch: refs/heads/master Commit: 663282939d49be7a3a0747a7d42c72fc563cf21a Parents: 978b102 Author: Claus Ibsen <davscl...@apache.org> Authored: Sat May 25 08:35:38 2013 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Sat May 25 08:35:38 2013 +0200 ---------------------------------------------------------------------- .../apache/camel/component/solr/SolrComponent.java | 101 +++++++++- .../apache/camel/component/solr/SolrEndpoint.java | 165 ++++++++++----- .../apache/camel/component/solr/SolrProducer.java | 20 ++- 3 files changed, 229 insertions(+), 57 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/66328293/components/camel-solr/src/main/java/org/apache/camel/component/solr/SolrComponent.java ---------------------------------------------------------------------- diff --git a/components/camel-solr/src/main/java/org/apache/camel/component/solr/SolrComponent.java b/components/camel-solr/src/main/java/org/apache/camel/component/solr/SolrComponent.java index b816c97..efde126 100644 --- a/components/camel-solr/src/main/java/org/apache/camel/component/solr/SolrComponent.java +++ b/components/camel-solr/src/main/java/org/apache/camel/component/solr/SolrComponent.java @@ -16,18 +16,117 @@ */ package org.apache.camel.component.solr; +import java.util.HashMap; import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; + import org.apache.camel.Endpoint; import org.apache.camel.impl.DefaultComponent; +import org.apache.solr.client.solrj.impl.ConcurrentUpdateSolrServer; +import org.apache.solr.client.solrj.impl.HttpSolrServer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Represents the component that manages {@link SolrEndpoint}. */ public class SolrComponent extends DefaultComponent { + private static final transient Logger LOG = LoggerFactory.getLogger(SolrComponent.class); + private final Map<SolrEndpoint, SolrServerReference> servers = new HashMap<SolrEndpoint, SolrServerReference>(); + + protected static final class SolrServerReference { + + private final AtomicInteger referenceCounter = new AtomicInteger(); + private final SolrEndpoint endpoint; + private HttpSolrServer solrServer; + private ConcurrentUpdateSolrServer updateSolrServer; + + SolrServerReference(SolrEndpoint endpoint) { + this.endpoint = endpoint; + } + + public HttpSolrServer getSolrServer() { + return solrServer; + } + + public void setSolrServer(HttpSolrServer solrServer) { + this.solrServer = solrServer; + } + + public ConcurrentUpdateSolrServer getUpdateSolrServer() { + return updateSolrServer; + } + + public void setUpdateSolrServer(ConcurrentUpdateSolrServer updateSolrServer) { + this.updateSolrServer = updateSolrServer; + } + + public int addReference() { + return referenceCounter.incrementAndGet(); + } + + public int decReference() { + return referenceCounter.decrementAndGet(); + } + } + protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception { - Endpoint endpoint = new SolrEndpoint(uri, this, remaining, parameters); + Endpoint endpoint = new SolrEndpoint(uri, this, remaining); setProperties(endpoint, parameters); return endpoint; } + + public SolrServerReference getSolrServers(SolrEndpoint endpoint) { + return servers.get(endpoint); + } + + public void addSolrServers(SolrEndpoint endpoint, SolrServerReference servers) { + this.servers.put(endpoint, servers); + } + + @Override + protected void doShutdown() throws Exception { + for (SolrServerReference server : servers.values()) { + shutdownServers(server); + } + servers.clear(); + } + + void shutdownServers(SolrServerReference ref) { + shutdownServers(ref, false); + } + + void shutdownServers(SolrServerReference ref, boolean remove) { + try { + if (ref.getSolrServer() != null) { + LOG.info("Shutting down solr server: " + ref.getSolrServer()); + ref.getSolrServer().shutdown(); + } + } catch (Exception e) { + LOG.warn("Error shutting down solr server. This exception is ignored.", e); + } + try { + if (ref.getUpdateSolrServer() != null) { + LOG.info("Shutting down update solr server: " + ref.getUpdateSolrServer()); + ref.getUpdateSolrServer().shutdownNow(); + } + } catch (Exception e) { + LOG.warn("Error shutting down streaming solr server. This exception is ignored.", e); + } + + if (remove) { + SolrEndpoint key = null; + for (Map.Entry<SolrEndpoint, SolrServerReference> entry : servers.entrySet()) { + if (entry.getValue() == ref) { + key = entry.getKey(); + break; + } + } + if (key != null) { + servers.remove(key); + } + } + + } } http://git-wip-us.apache.org/repos/asf/camel/blob/66328293/components/camel-solr/src/main/java/org/apache/camel/component/solr/SolrEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-solr/src/main/java/org/apache/camel/component/solr/SolrEndpoint.java b/components/camel-solr/src/main/java/org/apache/camel/component/solr/SolrEndpoint.java index f6b3703..3f613fa 100644 --- a/components/camel-solr/src/main/java/org/apache/camel/component/solr/SolrEndpoint.java +++ b/components/camel-solr/src/main/java/org/apache/camel/component/solr/SolrEndpoint.java @@ -17,7 +17,6 @@ package org.apache.camel.component.solr; import java.net.URL; -import java.util.Map; import org.apache.camel.Consumer; import org.apache.camel.Processor; @@ -31,32 +30,81 @@ import org.apache.solr.client.solrj.impl.HttpSolrServer; */ public class SolrEndpoint extends DefaultEndpoint { - private HttpSolrServer solrServer; - private ConcurrentUpdateSolrServer streamingSolrServer; private String requestHandler; - private int streamingThreadCount; - private int streamingQueueSize; - - public SolrEndpoint(String endpointUri, SolrComponent component, String address, Map<String, Object> parameters) throws Exception { + private String url; + private int streamingQueueSize = SolrConstants.DEFUALT_STREAMING_QUEUE_SIZE; + private int streamingThreadCount = SolrConstants.DEFAULT_STREAMING_THREAD_COUNT; + private Integer maxRetries; + private Integer soTimeout; + private Integer connectionTimeout; + private Integer defaultMaxConnectionsPerHost; + private Integer maxTotalConnections; + private Boolean followRedirects; + private Boolean allowCompression; + + public SolrEndpoint(String endpointUri, SolrComponent component, String address) throws Exception { super(endpointUri, component); - // check the url address URL url = new URL("http://" + address); - solrServer = new HttpSolrServer(url.toString()); - streamingQueueSize = getIntFromString((String) parameters.get(SolrConstants.PARAM_STREAMING_QUEUE_SIZE), SolrConstants.DEFUALT_STREAMING_QUEUE_SIZE); - streamingThreadCount = getIntFromString((String) parameters.get(SolrConstants.PARAM_STREAMING_THREAD_COUNT), SolrConstants.DEFAULT_STREAMING_THREAD_COUNT); - streamingSolrServer = new ConcurrentUpdateSolrServer(url.toString(), streamingQueueSize, streamingThreadCount); + this.url = url.toString(); } - public static int getIntFromString(String value, int defaultValue) { - if (value != null && value.length() > 0) { - return Integer.parseInt(value); - } - return defaultValue; + @Override + public SolrComponent getComponent() { + return (SolrComponent) super.getComponent(); } @Override public Producer createProducer() throws Exception { - return new SolrProducer(this); + // do we have servers? + SolrComponent.SolrServerReference ref = getComponent().getSolrServers(this); + if (ref == null) { + + // no then create new servers + HttpSolrServer solrServer = new HttpSolrServer(url); + ConcurrentUpdateSolrServer solrStreamingServer = new ConcurrentUpdateSolrServer(url, streamingQueueSize, streamingThreadCount); + + // set the properties on the solr server + if (maxRetries != null) { + solrServer.setMaxRetries(maxRetries); + } + if (soTimeout != null) { + solrServer.setSoTimeout(soTimeout); + } + if (connectionTimeout != null) { + solrServer.setConnectionTimeout(connectionTimeout); + } + if (defaultMaxConnectionsPerHost != null) { + solrServer.setDefaultMaxConnectionsPerHost(defaultMaxConnectionsPerHost); + } + if (maxTotalConnections != null) { + solrServer.setMaxTotalConnections(maxTotalConnections); + } + if (followRedirects != null) { + solrServer.setFollowRedirects(followRedirects); + } + if (allowCompression != null) { + solrServer.setAllowCompression(allowCompression); + } + + ref = new SolrComponent.SolrServerReference(this); + ref.setSolrServer(solrServer); + ref.setUpdateSolrServer(solrStreamingServer); + + getComponent().addSolrServers(this, ref); + } + + ref.addReference(); + return new SolrProducer(this, ref.getSolrServer(), ref.getUpdateSolrServer()); + } + + protected void onProducerShutdown(SolrProducer producer) { + SolrComponent.SolrServerReference ref = getComponent().getSolrServers(this); + if (ref != null) { + int counter = ref.decReference(); + if (counter <= 0) { + getComponent().shutdownServers(ref, true); + } + } } @Override @@ -69,67 +117,84 @@ public class SolrEndpoint extends DefaultEndpoint { return true; } - public HttpSolrServer getSolrServer() { - return solrServer; + public void setRequestHandler(String requestHandler) { + this.requestHandler = requestHandler; } - public ConcurrentUpdateSolrServer getStreamingSolrServer() { - return streamingSolrServer; + public String getRequestHandler() { + return requestHandler; } - public void setStreamingSolrServer(ConcurrentUpdateSolrServer streamingSolrServer) { - this.streamingSolrServer = streamingSolrServer; + public int getStreamingThreadCount() { + return streamingThreadCount; } - public void setMaxRetries(int maxRetries) { - solrServer.setMaxRetries(maxRetries); + public void setStreamingThreadCount(int streamingThreadCount) { + this.streamingThreadCount = streamingThreadCount; } - public void setSoTimeout(int soTimeout) { - solrServer.setSoTimeout(soTimeout); + public int getStreamingQueueSize() { + return streamingQueueSize; } - public void setConnectionTimeout(int connectionTimeout) { - solrServer.setConnectionTimeout(connectionTimeout); + public void setStreamingQueueSize(int streamingQueueSize) { + this.streamingQueueSize = streamingQueueSize; } - public void setDefaultMaxConnectionsPerHost(int defaultMaxConnectionsPerHost) { - solrServer.setDefaultMaxConnectionsPerHost(defaultMaxConnectionsPerHost); + public Integer getMaxRetries() { + return maxRetries; } - public void setMaxTotalConnections(int maxTotalConnections) { - solrServer.setMaxTotalConnections(maxTotalConnections); + public void setMaxRetries(Integer maxRetries) { + this.maxRetries = maxRetries; } - public void setFollowRedirects(boolean followRedirects) { - solrServer.setFollowRedirects(followRedirects); + public Integer getSoTimeout() { + return soTimeout; } - public void setAllowCompression(boolean allowCompression) { - solrServer.setAllowCompression(allowCompression); + public void setSoTimeout(Integer soTimeout) { + this.soTimeout = soTimeout; } - public void setRequestHandler(String requestHandler) { - this.requestHandler = requestHandler; + public Integer getConnectionTimeout() { + return connectionTimeout; } - public String getRequestHandler() { - return requestHandler; + public void setConnectionTimeout(Integer connectionTimeout) { + this.connectionTimeout = connectionTimeout; } - public int getStreamingThreadCount() { - return streamingThreadCount; + public Integer getDefaultMaxConnectionsPerHost() { + return defaultMaxConnectionsPerHost; } - public void setStreamingThreadCount(int streamingThreadCount) { - this.streamingThreadCount = streamingThreadCount; + public void setDefaultMaxConnectionsPerHost(Integer defaultMaxConnectionsPerHost) { + this.defaultMaxConnectionsPerHost = defaultMaxConnectionsPerHost; } - public int getStreamingQueueSize() { - return streamingQueueSize; + public Integer getMaxTotalConnections() { + return maxTotalConnections; } - public void setStreamingQueueSize(int streamingQueueSize) { - this.streamingQueueSize = streamingQueueSize; + public void setMaxTotalConnections(Integer maxTotalConnections) { + this.maxTotalConnections = maxTotalConnections; + } + + public Boolean getFollowRedirects() { + return followRedirects; + } + + public void setFollowRedirects(Boolean followRedirects) { + this.followRedirects = followRedirects; } + + public Boolean getAllowCompression() { + return allowCompression; + } + + public void setAllowCompression(Boolean allowCompression) { + this.allowCompression = allowCompression; + } + } http://git-wip-us.apache.org/repos/asf/camel/blob/66328293/components/camel-solr/src/main/java/org/apache/camel/component/solr/SolrProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-solr/src/main/java/org/apache/camel/component/solr/SolrProducer.java b/components/camel-solr/src/main/java/org/apache/camel/component/solr/SolrProducer.java index 1021e73..56193e1 100644 --- a/components/camel-solr/src/main/java/org/apache/camel/component/solr/SolrProducer.java +++ b/components/camel-solr/src/main/java/org/apache/camel/component/solr/SolrProducer.java @@ -18,10 +18,12 @@ package org.apache.camel.component.solr; import java.io.File; import java.util.Map; + import org.apache.camel.Exchange; import org.apache.camel.WrappedFile; import org.apache.camel.impl.DefaultProducer; -import org.apache.solr.client.solrj.SolrServer; +import org.apache.solr.client.solrj.impl.ConcurrentUpdateSolrServer; +import org.apache.solr.client.solrj.impl.HttpSolrServer; import org.apache.solr.client.solrj.request.ContentStreamUpdateRequest; import org.apache.solr.client.solrj.request.DirectXmlRequest; import org.apache.solr.client.solrj.request.UpdateRequest; @@ -32,13 +34,13 @@ import org.apache.solr.common.SolrInputDocument; * The Solr producer. */ public class SolrProducer extends DefaultProducer { - private SolrServer solrServer; - private SolrServer streamingSolrServer; + private HttpSolrServer solrServer; + private ConcurrentUpdateSolrServer streamingSolrServer; - public SolrProducer(SolrEndpoint endpoint) { + public SolrProducer(SolrEndpoint endpoint, HttpSolrServer solrServer, ConcurrentUpdateSolrServer streamingSolrServer) { super(endpoint); - solrServer = endpoint.getSolrServer(); - streamingSolrServer = endpoint.getStreamingSolrServer(); + this.solrServer = solrServer; + this.streamingSolrServer = streamingSolrServer; } @Override @@ -166,4 +168,10 @@ public class SolrProducer extends DefaultProducer { public SolrEndpoint getEndpoint() { return (SolrEndpoint) super.getEndpoint(); } + + @Override + protected void doShutdown() throws Exception { + getEndpoint().onProducerShutdown(this); + } + }