CAMEL-6396: camel-solr should shutdown solr servers when no longer in use.

Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/66328293
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/66328293
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/66328293

Branch: refs/heads/master
Commit: 663282939d49be7a3a0747a7d42c72fc563cf21a
Parents: 978b102
Author: Claus Ibsen <davscl...@apache.org>
Authored: Sat May 25 08:35:38 2013 +0200
Committer: Claus Ibsen <davscl...@apache.org>
Committed: Sat May 25 08:35:38 2013 +0200

----------------------------------------------------------------------
 .../apache/camel/component/solr/SolrComponent.java |  101 +++++++++-
 .../apache/camel/component/solr/SolrEndpoint.java  |  165 ++++++++++-----
 .../apache/camel/component/solr/SolrProducer.java  |   20 ++-
 3 files changed, 229 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/66328293/components/camel-solr/src/main/java/org/apache/camel/component/solr/SolrComponent.java
----------------------------------------------------------------------
diff --git 
a/components/camel-solr/src/main/java/org/apache/camel/component/solr/SolrComponent.java
 
b/components/camel-solr/src/main/java/org/apache/camel/component/solr/SolrComponent.java
index b816c97..efde126 100644
--- 
a/components/camel-solr/src/main/java/org/apache/camel/component/solr/SolrComponent.java
+++ 
b/components/camel-solr/src/main/java/org/apache/camel/component/solr/SolrComponent.java
@@ -16,18 +16,117 @@
  */
 package org.apache.camel.component.solr;
 
+import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
 import org.apache.camel.Endpoint;
 import org.apache.camel.impl.DefaultComponent;
+import org.apache.solr.client.solrj.impl.ConcurrentUpdateSolrServer;
+import org.apache.solr.client.solrj.impl.HttpSolrServer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Represents the component that manages {@link SolrEndpoint}.
  */
 public class SolrComponent extends DefaultComponent {
 
+    private static final transient Logger LOG = 
LoggerFactory.getLogger(SolrComponent.class);
+    private final Map<SolrEndpoint, SolrServerReference> servers = new 
HashMap<SolrEndpoint, SolrServerReference>();
+
+    protected  static final class SolrServerReference {
+
+        private final AtomicInteger referenceCounter = new AtomicInteger();
+        private final SolrEndpoint endpoint;
+        private HttpSolrServer solrServer;
+        private ConcurrentUpdateSolrServer updateSolrServer;
+
+        SolrServerReference(SolrEndpoint endpoint) {
+            this.endpoint = endpoint;
+        }
+
+        public HttpSolrServer getSolrServer() {
+            return solrServer;
+        }
+
+        public void setSolrServer(HttpSolrServer solrServer) {
+            this.solrServer = solrServer;
+        }
+
+        public ConcurrentUpdateSolrServer getUpdateSolrServer() {
+            return updateSolrServer;
+        }
+
+        public void setUpdateSolrServer(ConcurrentUpdateSolrServer 
updateSolrServer) {
+            this.updateSolrServer = updateSolrServer;
+        }
+
+        public int addReference() {
+            return referenceCounter.incrementAndGet();
+        }
+
+        public int decReference() {
+            return referenceCounter.decrementAndGet();
+        }
+    }
+
     protected Endpoint createEndpoint(String uri, String remaining, 
Map<String, Object> parameters) throws Exception {
-        Endpoint endpoint = new SolrEndpoint(uri, this, remaining, parameters);
+        Endpoint endpoint = new SolrEndpoint(uri, this, remaining);
         setProperties(endpoint, parameters);
         return endpoint;
     }
+
+    public SolrServerReference getSolrServers(SolrEndpoint endpoint) {
+        return servers.get(endpoint);
+    }
+
+    public void addSolrServers(SolrEndpoint endpoint, SolrServerReference 
servers) {
+        this.servers.put(endpoint, servers);
+    }
+
+    @Override
+    protected void doShutdown() throws Exception {
+        for (SolrServerReference server : servers.values()) {
+            shutdownServers(server);
+        }
+        servers.clear();
+    }
+
+    void shutdownServers(SolrServerReference ref) {
+        shutdownServers(ref, false);
+    }
+
+    void shutdownServers(SolrServerReference ref, boolean remove) {
+        try {
+            if (ref.getSolrServer() != null) {
+                LOG.info("Shutting down solr server: " + ref.getSolrServer());
+                ref.getSolrServer().shutdown();
+            }
+        } catch (Exception e) {
+            LOG.warn("Error shutting down solr server. This exception is 
ignored.", e);
+        }
+        try {
+            if (ref.getUpdateSolrServer() != null) {
+                LOG.info("Shutting down update solr server: " + 
ref.getUpdateSolrServer());
+                ref.getUpdateSolrServer().shutdownNow();
+            }
+        } catch (Exception e) {
+            LOG.warn("Error shutting down streaming solr server. This 
exception is ignored.", e);
+        }
+
+        if (remove) {
+            SolrEndpoint key = null;
+            for (Map.Entry<SolrEndpoint, SolrServerReference> entry : 
servers.entrySet()) {
+                if (entry.getValue() == ref) {
+                    key = entry.getKey();
+                    break;
+                }
+            }
+            if (key != null) {
+                servers.remove(key);
+            }
+        }
+
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/66328293/components/camel-solr/src/main/java/org/apache/camel/component/solr/SolrEndpoint.java
----------------------------------------------------------------------
diff --git 
a/components/camel-solr/src/main/java/org/apache/camel/component/solr/SolrEndpoint.java
 
b/components/camel-solr/src/main/java/org/apache/camel/component/solr/SolrEndpoint.java
index f6b3703..3f613fa 100644
--- 
a/components/camel-solr/src/main/java/org/apache/camel/component/solr/SolrEndpoint.java
+++ 
b/components/camel-solr/src/main/java/org/apache/camel/component/solr/SolrEndpoint.java
@@ -17,7 +17,6 @@
 package org.apache.camel.component.solr;
 
 import java.net.URL;
-import java.util.Map;
 
 import org.apache.camel.Consumer;
 import org.apache.camel.Processor;
@@ -31,32 +30,81 @@ import org.apache.solr.client.solrj.impl.HttpSolrServer;
  */
 public class SolrEndpoint extends DefaultEndpoint {
 
-    private HttpSolrServer solrServer;
-    private ConcurrentUpdateSolrServer streamingSolrServer;
     private String requestHandler;
-    private int streamingThreadCount;
-    private int streamingQueueSize;
-
-    public SolrEndpoint(String endpointUri, SolrComponent component, String 
address, Map<String, Object> parameters) throws Exception {
+    private String url;
+    private int streamingQueueSize = 
SolrConstants.DEFUALT_STREAMING_QUEUE_SIZE;
+    private int streamingThreadCount = 
SolrConstants.DEFAULT_STREAMING_THREAD_COUNT;
+    private Integer maxRetries;
+    private Integer soTimeout;
+    private Integer connectionTimeout;
+    private Integer defaultMaxConnectionsPerHost;
+    private Integer maxTotalConnections;
+    private Boolean followRedirects;
+    private Boolean allowCompression;
+
+    public SolrEndpoint(String endpointUri, SolrComponent component, String 
address) throws Exception {
         super(endpointUri, component);
-        // check the url address
         URL url = new URL("http://"; + address);
-        solrServer = new HttpSolrServer(url.toString());
-        streamingQueueSize = getIntFromString((String) 
parameters.get(SolrConstants.PARAM_STREAMING_QUEUE_SIZE), 
SolrConstants.DEFUALT_STREAMING_QUEUE_SIZE);
-        streamingThreadCount = getIntFromString((String) 
parameters.get(SolrConstants.PARAM_STREAMING_THREAD_COUNT), 
SolrConstants.DEFAULT_STREAMING_THREAD_COUNT);
-        streamingSolrServer = new ConcurrentUpdateSolrServer(url.toString(), 
streamingQueueSize, streamingThreadCount);
+        this.url = url.toString();
     }
 
-    public static int getIntFromString(String value, int defaultValue) {
-        if (value != null && value.length() > 0) {
-            return Integer.parseInt(value);
-        }
-        return defaultValue;
+    @Override
+    public SolrComponent getComponent() {
+        return (SolrComponent) super.getComponent();
     }
 
     @Override
     public Producer createProducer() throws Exception {
-        return new SolrProducer(this);
+        // do we have servers?
+        SolrComponent.SolrServerReference ref = 
getComponent().getSolrServers(this);
+        if (ref == null) {
+
+            // no then create new servers
+            HttpSolrServer solrServer = new HttpSolrServer(url);
+            ConcurrentUpdateSolrServer solrStreamingServer = new 
ConcurrentUpdateSolrServer(url, streamingQueueSize, streamingThreadCount);
+
+            // set the properties on the solr server
+            if (maxRetries != null) {
+                solrServer.setMaxRetries(maxRetries);
+            }
+            if (soTimeout != null) {
+                solrServer.setSoTimeout(soTimeout);
+            }
+            if (connectionTimeout != null) {
+                solrServer.setConnectionTimeout(connectionTimeout);
+            }
+            if (defaultMaxConnectionsPerHost != null) {
+                
solrServer.setDefaultMaxConnectionsPerHost(defaultMaxConnectionsPerHost);
+            }
+            if (maxTotalConnections != null) {
+                solrServer.setMaxTotalConnections(maxTotalConnections);
+            }
+            if (followRedirects != null) {
+                solrServer.setFollowRedirects(followRedirects);
+            }
+            if (allowCompression != null) {
+                solrServer.setAllowCompression(allowCompression);
+            }
+
+            ref = new SolrComponent.SolrServerReference(this);
+            ref.setSolrServer(solrServer);
+            ref.setUpdateSolrServer(solrStreamingServer);
+
+            getComponent().addSolrServers(this, ref);
+        }
+
+        ref.addReference();
+        return new SolrProducer(this, ref.getSolrServer(), 
ref.getUpdateSolrServer());
+    }
+
+    protected void onProducerShutdown(SolrProducer producer) {
+        SolrComponent.SolrServerReference ref = 
getComponent().getSolrServers(this);
+        if (ref != null) {
+            int counter = ref.decReference();
+            if (counter <= 0) {
+                getComponent().shutdownServers(ref, true);
+            }
+        }
     }
 
     @Override
@@ -69,67 +117,84 @@ public class SolrEndpoint extends DefaultEndpoint {
         return true;
     }
 
-    public HttpSolrServer getSolrServer() {
-        return solrServer;
+    public void setRequestHandler(String requestHandler) {
+        this.requestHandler = requestHandler;
     }
 
-    public ConcurrentUpdateSolrServer getStreamingSolrServer() {
-        return streamingSolrServer;
+    public String getRequestHandler() {
+        return requestHandler;
     }
 
-    public void setStreamingSolrServer(ConcurrentUpdateSolrServer 
streamingSolrServer) {
-        this.streamingSolrServer = streamingSolrServer;
+    public int getStreamingThreadCount() {
+        return streamingThreadCount;
     }
 
-    public void setMaxRetries(int maxRetries) {
-        solrServer.setMaxRetries(maxRetries);
+    public void setStreamingThreadCount(int streamingThreadCount) {
+        this.streamingThreadCount = streamingThreadCount;
     }
 
-    public void setSoTimeout(int soTimeout) {
-        solrServer.setSoTimeout(soTimeout);
+    public int getStreamingQueueSize() {
+        return streamingQueueSize;
     }
 
-    public void setConnectionTimeout(int connectionTimeout) {
-        solrServer.setConnectionTimeout(connectionTimeout);
+    public void setStreamingQueueSize(int streamingQueueSize) {
+        this.streamingQueueSize = streamingQueueSize;
     }
 
-    public void setDefaultMaxConnectionsPerHost(int 
defaultMaxConnectionsPerHost) {
-        
solrServer.setDefaultMaxConnectionsPerHost(defaultMaxConnectionsPerHost);
+    public Integer getMaxRetries() {
+        return maxRetries;
     }
 
-    public void setMaxTotalConnections(int maxTotalConnections) {
-        solrServer.setMaxTotalConnections(maxTotalConnections);
+    public void setMaxRetries(Integer maxRetries) {
+        this.maxRetries = maxRetries;
     }
 
-    public void setFollowRedirects(boolean followRedirects) {
-        solrServer.setFollowRedirects(followRedirects);
+    public Integer getSoTimeout() {
+        return soTimeout;
     }
 
-    public void setAllowCompression(boolean allowCompression) {
-        solrServer.setAllowCompression(allowCompression);
+    public void setSoTimeout(Integer soTimeout) {
+        this.soTimeout = soTimeout;
     }
 
-    public void setRequestHandler(String requestHandler) {
-        this.requestHandler = requestHandler;
+    public Integer getConnectionTimeout() {
+        return connectionTimeout;
     }
 
-    public String getRequestHandler() {
-        return requestHandler;
+    public void setConnectionTimeout(Integer connectionTimeout) {
+        this.connectionTimeout = connectionTimeout;
     }
 
-    public int getStreamingThreadCount() {
-        return streamingThreadCount;
+    public Integer getDefaultMaxConnectionsPerHost() {
+        return defaultMaxConnectionsPerHost;
     }
 
-    public void setStreamingThreadCount(int streamingThreadCount) {
-        this.streamingThreadCount = streamingThreadCount;
+    public void setDefaultMaxConnectionsPerHost(Integer 
defaultMaxConnectionsPerHost) {
+        this.defaultMaxConnectionsPerHost = defaultMaxConnectionsPerHost;
     }
 
-    public int getStreamingQueueSize() {
-        return streamingQueueSize;
+    public Integer getMaxTotalConnections() {
+        return maxTotalConnections;
     }
 
-    public void setStreamingQueueSize(int streamingQueueSize) {
-        this.streamingQueueSize = streamingQueueSize;
+    public void setMaxTotalConnections(Integer maxTotalConnections) {
+        this.maxTotalConnections = maxTotalConnections;
+    }
+
+    public Boolean getFollowRedirects() {
+        return followRedirects;
+    }
+
+    public void setFollowRedirects(Boolean followRedirects) {
+        this.followRedirects = followRedirects;
     }
+
+    public Boolean getAllowCompression() {
+        return allowCompression;
+    }
+
+    public void setAllowCompression(Boolean allowCompression) {
+        this.allowCompression = allowCompression;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/66328293/components/camel-solr/src/main/java/org/apache/camel/component/solr/SolrProducer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-solr/src/main/java/org/apache/camel/component/solr/SolrProducer.java
 
b/components/camel-solr/src/main/java/org/apache/camel/component/solr/SolrProducer.java
index 1021e73..56193e1 100644
--- 
a/components/camel-solr/src/main/java/org/apache/camel/component/solr/SolrProducer.java
+++ 
b/components/camel-solr/src/main/java/org/apache/camel/component/solr/SolrProducer.java
@@ -18,10 +18,12 @@ package org.apache.camel.component.solr;
 
 import java.io.File;
 import java.util.Map;
+
 import org.apache.camel.Exchange;
 import org.apache.camel.WrappedFile;
 import org.apache.camel.impl.DefaultProducer;
-import org.apache.solr.client.solrj.SolrServer;
+import org.apache.solr.client.solrj.impl.ConcurrentUpdateSolrServer;
+import org.apache.solr.client.solrj.impl.HttpSolrServer;
 import org.apache.solr.client.solrj.request.ContentStreamUpdateRequest;
 import org.apache.solr.client.solrj.request.DirectXmlRequest;
 import org.apache.solr.client.solrj.request.UpdateRequest;
@@ -32,13 +34,13 @@ import org.apache.solr.common.SolrInputDocument;
  * The Solr producer.
  */
 public class SolrProducer extends DefaultProducer {
-    private SolrServer solrServer;
-    private SolrServer streamingSolrServer;
+    private HttpSolrServer solrServer;
+    private ConcurrentUpdateSolrServer streamingSolrServer;
 
-    public SolrProducer(SolrEndpoint endpoint) {
+    public SolrProducer(SolrEndpoint endpoint, HttpSolrServer solrServer, 
ConcurrentUpdateSolrServer streamingSolrServer) {
         super(endpoint);
-        solrServer = endpoint.getSolrServer();
-        streamingSolrServer = endpoint.getStreamingSolrServer();
+        this.solrServer = solrServer;
+        this.streamingSolrServer = streamingSolrServer;
     }
 
     @Override
@@ -166,4 +168,10 @@ public class SolrProducer extends DefaultProducer {
     public SolrEndpoint getEndpoint() {
         return (SolrEndpoint) super.getEndpoint();
     }
+
+    @Override
+    protected void doShutdown() throws Exception {
+        getEndpoint().onProducerShutdown(this);
+    }
+
 }

Reply via email to